From 464ddcec24d0c8c1554a31427560489b55803fd2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 24 Sep 2014 19:05:04 -0700 Subject: [PATCH 01/13] cherry-pick rxin's commit --- .../apache/spark/util/collection/Sorter.java | 62 ++++++++++++------- .../util/collection/SortDataFormat.scala | 6 ++ 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/Sorter.java index 64ad18c0e463a..9754d536de256 100644 --- a/core/src/main/java/org/apache/spark/util/collection/Sorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/Sorter.java @@ -162,10 +162,13 @@ private void binarySort(Buffer a, int lo, int hi, int start, Comparator>> 1; - if (c.compare(pivot, s.getKey(a, mid)) < 0) + if (c.compare(pivot, s.getKey(a, mid, mutableThiny2)) < 0) right = mid; else left = mid + 1; @@ -235,13 +238,16 @@ private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator= 0) + while (runHi < hi && c.compare(s.getKey(a, runHi, mutableThiny1), s.getKey(a, runHi - 1, mutableThiny2)) >= 0) runHi++; } @@ -468,11 +474,14 @@ private void mergeAt(int i) { } stackSize--; + + K mutableThiny1 = s.createNewMutableThingy(); + /* * Find where the first element of run2 goes in run1. Prior elements * in run1 can be ignored (because they're already in place). */ - int k = gallopRight(s.getKey(a, base2), a, base1, len1, 0, c); + int k = gallopRight(s.getKey(a, base2, mutableThiny1), a, base1, len1, 0, c); assert k >= 0; base1 += k; len1 -= k; @@ -483,7 +492,7 @@ private void mergeAt(int i) { * Find where the last element of run1 goes in run2. Subsequent elements * in run2 can be ignored (because they're already in place). */ - len2 = gallopLeft(s.getKey(a, base1 + len1 - 1), a, base2, len2, len2 - 1, c); + len2 = gallopLeft(s.getKey(a, base1 + len1 - 1, mutableThiny1), a, base2, len2, len2 - 1, c); assert len2 >= 0; if (len2 == 0) return; @@ -517,10 +526,12 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator< assert len > 0 && hint >= 0 && hint < len; int lastOfs = 0; int ofs = 1; - if (c.compare(key, s.getKey(a, base + hint)) > 0) { + K mutableThiny1 = s.createNewMutableThingy(); + + if (c.compare(key, s.getKey(a, base + hint, mutableThiny1)) > 0) { // Gallop right until a[base+hint+lastOfs] < key <= a[base+hint+ofs] int maxOfs = len - hint; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) > 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, mutableThiny1)) > 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -535,7 +546,7 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator< } else { // key <= a[base + hint] // Gallop left until a[base+hint-ofs] < key <= a[base+hint-lastOfs] final int maxOfs = hint + 1; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) <= 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, mutableThiny1)) <= 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -560,7 +571,7 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator< while (lastOfs < ofs) { int m = lastOfs + ((ofs - lastOfs) >>> 1); - if (c.compare(key, s.getKey(a, base + m)) > 0) + if (c.compare(key, s.getKey(a, base + m, mutableThiny1)) > 0) lastOfs = m + 1; // a[base + m] < key else ofs = m; // key <= a[base + m] @@ -587,10 +598,12 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator int ofs = 1; int lastOfs = 0; - if (c.compare(key, s.getKey(a, base + hint)) < 0) { + K mutableThiny1 = s.createNewMutableThingy(); + + if (c.compare(key, s.getKey(a, base + hint, mutableThiny1)) < 0) { // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs] int maxOfs = hint + 1; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) < 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, mutableThiny1)) < 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -606,7 +619,7 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator } else { // a[b + hint] <= key // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs] int maxOfs = len - hint; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) >= 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, mutableThiny1)) >= 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -630,7 +643,7 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator while (lastOfs < ofs) { int m = lastOfs + ((ofs - lastOfs) >>> 1); - if (c.compare(key, s.getKey(a, base + m)) < 0) + if (c.compare(key, s.getKey(a, base + m, mutableThiny1)) < 0) ofs = m; // key < a[b + m] else lastOfs = m + 1; // a[b + m] <= key @@ -679,6 +692,9 @@ private void mergeLo(int base1, int len1, int base2, int len2) { return; } + K mutableThiny1 = s.createNewMutableThingy(); + K mutableThiny2 = s.createNewMutableThingy(); + Comparator c = this.c; // Use local variable for performance int minGallop = this.minGallop; // " " " " " outer: @@ -692,7 +708,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) { */ do { assert len1 > 1 && len2 > 0; - if (c.compare(s.getKey(a, cursor2), s.getKey(tmp, cursor1)) < 0) { + if (c.compare(s.getKey(a, cursor2, mutableThiny1), s.getKey(tmp, cursor1, mutableThiny2)) < 0) { s.copyElement(a, cursor2++, a, dest++); count2++; count1 = 0; @@ -714,7 +730,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) { */ do { assert len1 > 1 && len2 > 0; - count1 = gallopRight(s.getKey(a, cursor2), tmp, cursor1, len1, 0, c); + count1 = gallopRight(s.getKey(a, cursor2, mutableThiny1), tmp, cursor1, len1, 0, c); if (count1 != 0) { s.copyRange(tmp, cursor1, a, dest, count1); dest += count1; @@ -727,7 +743,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) { if (--len2 == 0) break outer; - count2 = gallopLeft(s.getKey(tmp, cursor1), a, cursor2, len2, 0, c); + count2 = gallopLeft(s.getKey(tmp, cursor1, mutableThiny1), a, cursor2, len2, 0, c); if (count2 != 0) { s.copyRange(a, cursor2, a, dest, count2); dest += count2; @@ -784,6 +800,10 @@ private void mergeHi(int base1, int len1, int base2, int len2) { int cursor2 = len2 - 1; // Indexes into tmp array int dest = base2 + len2 - 1; // Indexes into a + K mutableThiny1 = s.createNewMutableThingy(); + K mutableThiny2 = s.createNewMutableThingy(); + + // Move last element of first run and deal with degenerate cases s.copyElement(a, cursor1--, a, dest--); if (--len1 == 0) { @@ -811,7 +831,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) { */ do { assert len1 > 0 && len2 > 1; - if (c.compare(s.getKey(tmp, cursor2), s.getKey(a, cursor1)) < 0) { + if (c.compare(s.getKey(tmp, cursor2, mutableThiny1), s.getKey(a, cursor1, mutableThiny2)) < 0) { s.copyElement(a, cursor1--, a, dest--); count1++; count2 = 0; @@ -833,7 +853,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) { */ do { assert len1 > 0 && len2 > 1; - count1 = len1 - gallopRight(s.getKey(tmp, cursor2), a, base1, len1, len1 - 1, c); + count1 = len1 - gallopRight(s.getKey(tmp, cursor2, mutableThiny1), a, base1, len1, len1 - 1, c); if (count1 != 0) { dest -= count1; cursor1 -= count1; @@ -846,7 +866,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) { if (--len2 == 1) break outer; - count2 = len2 - gallopLeft(s.getKey(a, cursor1), tmp, 0, len2, len2 - 1, c); + count2 = len2 - gallopLeft(s.getKey(a, cursor1, mutableThiny1), tmp, 0, len2, len2 - 1, c); if (count2 != 0) { dest -= count2; cursor2 -= count2; diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index ac1528969f0be..1eaa0491c4b0d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -37,6 +37,12 @@ private[spark] trait SortDataFormat[K, Buffer] extends Any { /** Return the sort key for the element at the given index. */ protected def getKey(data: Buffer, pos: Int): K + protected def createNewMutableThingy(): K = null.asInstanceOf[K] + + protected def getKey(data: Buffer, pos: Int, mutableThingy: K): K = { + getKey(data, pos) + } + /** Swap two elements. */ protected def swap(data: Buffer, pos0: Int, pos1: Int): Unit From cf94e8adf717cdedf3c4b6e99d513ad5bd6da572 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 24 Oct 2014 14:32:49 -0700 Subject: [PATCH 02/13] renaming --- .../apache/spark/util/collection/Sorter.java | 66 +++++++++---------- .../util/collection/SortDataFormat.scala | 11 +++- 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/Sorter.java index 9754d536de256..7d4dba85b2489 100644 --- a/core/src/main/java/org/apache/spark/util/collection/Sorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/Sorter.java @@ -162,13 +162,13 @@ private void binarySort(Buffer a, int lo, int hi, int start, Comparator>> 1; - if (c.compare(pivot, s.getKey(a, mid, mutableThiny2)) < 0) + if (c.compare(pivot, s.getKey(a, mid, key1)) < 0) right = mid; else left = mid + 1; @@ -238,16 +238,16 @@ private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator= 0) + while (runHi < hi && c.compare(s.getKey(a, runHi, key1), s.getKey(a, runHi - 1, key2)) >= 0) runHi++; } @@ -474,14 +474,13 @@ private void mergeAt(int i) { } stackSize--; - - K mutableThiny1 = s.createNewMutableThingy(); + K key0 = s.newKey(); /* * Find where the first element of run2 goes in run1. Prior elements * in run1 can be ignored (because they're already in place). */ - int k = gallopRight(s.getKey(a, base2, mutableThiny1), a, base1, len1, 0, c); + int k = gallopRight(s.getKey(a, base2, key0), a, base1, len1, 0, c); assert k >= 0; base1 += k; len1 -= k; @@ -492,7 +491,7 @@ private void mergeAt(int i) { * Find where the last element of run1 goes in run2. Subsequent elements * in run2 can be ignored (because they're already in place). */ - len2 = gallopLeft(s.getKey(a, base1 + len1 - 1, mutableThiny1), a, base2, len2, len2 - 1, c); + len2 = gallopLeft(s.getKey(a, base1 + len1 - 1, key0), a, base2, len2, len2 - 1, c); assert len2 >= 0; if (len2 == 0) return; @@ -526,12 +525,12 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator< assert len > 0 && hint >= 0 && hint < len; int lastOfs = 0; int ofs = 1; - K mutableThiny1 = s.createNewMutableThingy(); + K key0 = s.newKey(); - if (c.compare(key, s.getKey(a, base + hint, mutableThiny1)) > 0) { + if (c.compare(key, s.getKey(a, base + hint, key0)) > 0) { // Gallop right until a[base+hint+lastOfs] < key <= a[base+hint+ofs] int maxOfs = len - hint; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, mutableThiny1)) > 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, key0)) > 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -546,7 +545,7 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator< } else { // key <= a[base + hint] // Gallop left until a[base+hint-ofs] < key <= a[base+hint-lastOfs] final int maxOfs = hint + 1; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, mutableThiny1)) <= 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, key0)) <= 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -571,7 +570,7 @@ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator< while (lastOfs < ofs) { int m = lastOfs + ((ofs - lastOfs) >>> 1); - if (c.compare(key, s.getKey(a, base + m, mutableThiny1)) > 0) + if (c.compare(key, s.getKey(a, base + m, key0)) > 0) lastOfs = m + 1; // a[base + m] < key else ofs = m; // key <= a[base + m] @@ -598,12 +597,12 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator int ofs = 1; int lastOfs = 0; - K mutableThiny1 = s.createNewMutableThingy(); + K key1 = s.newKey(); - if (c.compare(key, s.getKey(a, base + hint, mutableThiny1)) < 0) { + if (c.compare(key, s.getKey(a, base + hint, key1)) < 0) { // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs] int maxOfs = hint + 1; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, mutableThiny1)) < 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs, key1)) < 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -619,7 +618,7 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator } else { // a[b + hint] <= key // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs] int maxOfs = len - hint; - while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, mutableThiny1)) >= 0) { + while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs, key1)) >= 0) { lastOfs = ofs; ofs = (ofs << 1) + 1; if (ofs <= 0) // int overflow @@ -643,7 +642,7 @@ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator while (lastOfs < ofs) { int m = lastOfs + ((ofs - lastOfs) >>> 1); - if (c.compare(key, s.getKey(a, base + m, mutableThiny1)) < 0) + if (c.compare(key, s.getKey(a, base + m, key1)) < 0) ofs = m; // key < a[b + m] else lastOfs = m + 1; // a[b + m] <= key @@ -692,8 +691,8 @@ private void mergeLo(int base1, int len1, int base2, int len2) { return; } - K mutableThiny1 = s.createNewMutableThingy(); - K mutableThiny2 = s.createNewMutableThingy(); + K key0 = s.newKey(); + K key1 = s.newKey(); Comparator c = this.c; // Use local variable for performance int minGallop = this.minGallop; // " " " " " @@ -708,7 +707,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) { */ do { assert len1 > 1 && len2 > 0; - if (c.compare(s.getKey(a, cursor2, mutableThiny1), s.getKey(tmp, cursor1, mutableThiny2)) < 0) { + if (c.compare(s.getKey(a, cursor2, key0), s.getKey(tmp, cursor1, key1)) < 0) { s.copyElement(a, cursor2++, a, dest++); count2++; count1 = 0; @@ -730,7 +729,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) { */ do { assert len1 > 1 && len2 > 0; - count1 = gallopRight(s.getKey(a, cursor2, mutableThiny1), tmp, cursor1, len1, 0, c); + count1 = gallopRight(s.getKey(a, cursor2, key0), tmp, cursor1, len1, 0, c); if (count1 != 0) { s.copyRange(tmp, cursor1, a, dest, count1); dest += count1; @@ -743,7 +742,7 @@ private void mergeLo(int base1, int len1, int base2, int len2) { if (--len2 == 0) break outer; - count2 = gallopLeft(s.getKey(tmp, cursor1, mutableThiny1), a, cursor2, len2, 0, c); + count2 = gallopLeft(s.getKey(tmp, cursor1, key0), a, cursor2, len2, 0, c); if (count2 != 0) { s.copyRange(a, cursor2, a, dest, count2); dest += count2; @@ -800,9 +799,8 @@ private void mergeHi(int base1, int len1, int base2, int len2) { int cursor2 = len2 - 1; // Indexes into tmp array int dest = base2 + len2 - 1; // Indexes into a - K mutableThiny1 = s.createNewMutableThingy(); - K mutableThiny2 = s.createNewMutableThingy(); - + K key0 = s.newKey(); + K key1 = s.newKey(); // Move last element of first run and deal with degenerate cases s.copyElement(a, cursor1--, a, dest--); @@ -831,7 +829,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) { */ do { assert len1 > 0 && len2 > 1; - if (c.compare(s.getKey(tmp, cursor2, mutableThiny1), s.getKey(a, cursor1, mutableThiny2)) < 0) { + if (c.compare(s.getKey(tmp, cursor2, key0), s.getKey(a, cursor1, key1)) < 0) { s.copyElement(a, cursor1--, a, dest--); count1++; count2 = 0; @@ -853,7 +851,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) { */ do { assert len1 > 0 && len2 > 1; - count1 = len1 - gallopRight(s.getKey(tmp, cursor2, mutableThiny1), a, base1, len1, len1 - 1, c); + count1 = len1 - gallopRight(s.getKey(tmp, cursor2, key0), a, base1, len1, len1 - 1, c); if (count1 != 0) { dest -= count1; cursor1 -= count1; @@ -866,7 +864,7 @@ private void mergeHi(int base1, int len1, int base2, int len2) { if (--len2 == 1) break outer; - count2 = len2 - gallopLeft(s.getKey(a, cursor1, mutableThiny1), tmp, 0, len2, len2 - 1, c); + count2 = len2 - gallopLeft(s.getKey(a, cursor1, key0), tmp, 0, len2, len2 - 1, c); if (count2 != 0) { dest -= count2; cursor2 -= count2; diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index 1eaa0491c4b0d..99ecb040957bb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -34,12 +34,17 @@ import scala.reflect.ClassTag */ // TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity. private[spark] trait SortDataFormat[K, Buffer] extends Any { + + /** Creates a new mutable key for reuse. */ + protected def newKey(): K = null.asInstanceOf[K] + /** Return the sort key for the element at the given index. */ protected def getKey(data: Buffer, pos: Int): K - protected def createNewMutableThingy(): K = null.asInstanceOf[K] - - protected def getKey(data: Buffer, pos: Int, mutableThingy: K): K = { + /** + * Returns the sort key for the element at the given index and reuse the input key if possible. + */ + protected def getKey(data: Buffer, pos: Int, reuse: K): K = { getKey(data, pos) } From b00db4d0cd0b4080e296b1100f57918548568c18 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 24 Oct 2014 15:00:31 -0700 Subject: [PATCH 03/13] doc and tests --- .../apache/spark/util/collection/Sorter.java | 6 ++- .../spark/util/collection/SorterSuite.scala | 49 ++++++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/Sorter.java index 7d4dba85b2489..354459a84710e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/Sorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/Sorter.java @@ -24,12 +24,16 @@ * See the method comment on sort() for more details. * * This has been kept in Java with the original style in order to match very closely with the - * Anroid source code, and thus be easy to verify correctness. + * Android source code, and thus be easy to verify correctness. * * The purpose of the port is to generalize the interface to the sort to accept input data formats * besides simple arrays where every element is sorted individually. For instance, the AppendOnlyMap * uses this to sort an Array with alternating elements of the form [key, value, key, value]. * This generalization comes with minimal overhead -- see SortDataFormat for more information. + * + * We allow key reuse to prevent creating many key objects -- see SortDataFormat. + * + * @see org.apache.spark.util.collection.SortDataFormat */ class Sorter { diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 6fe1079c2719a..ea88ae24adf82 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -30,11 +30,15 @@ class SorterSuite extends FunSuite { val rand = new XORShiftRandom(123) val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() } val data1 = data0.clone() + val data2 = data0.clone() Arrays.sort(data0) new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int) + new Sorter(new KeyReuseIntArraySortDataFormat) + .sort(data2, 0, data2.length, Ordering[IntWrapper]) - data0.zip(data1).foreach { case (x, y) => assert(x === y) } + assert(data0.view === data1.view) + assert(data0.view === data2.view) } test("KVArraySorter") { @@ -137,12 +141,7 @@ class SorterSuite extends FunSuite { } } - -/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */ -class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] { - override protected def getKey(data: Array[Int], pos: Int): Int = { - data(pos) - } +abstract class AbstractIntArraySortDataFormat[K] extends SortDataFormat[K, Array[Int]] { override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = { val tmp = data(pos0) @@ -165,3 +164,39 @@ class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] { new Array[Int](length) } } + +/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */ +class IntArraySortDataFormat extends AbstractIntArraySortDataFormat[Int] { + + override protected def getKey(data: Array[Int], pos: Int): Int = { + data(pos) + } +} + +/** Wrapper of Int for key reuse. */ +class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] { + override def compare(that: IntWrapper): Int = { + key.compareTo(that.key) + } +} + +/** SortDataFormat for Array[Int] with reused keys. */ +class KeyReuseIntArraySortDataFormat extends AbstractIntArraySortDataFormat[IntWrapper] { + + override protected def newKey(): IntWrapper = { + new IntWrapper() + } + + override protected def getKey(data: Array[Int], pos: Int, reuse: IntWrapper): IntWrapper = { + if (reuse == null) { + new IntWrapper(data(pos)) + } else { + reuse.key = data(pos) + reuse + } + } + + override protected def getKey(data: Array[Int], pos: Int): IntWrapper = { + getKey(data, pos, null) + } +} From 6ffbe66b0705779ec17e801328bc1fa119e709d1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 25 Oct 2014 20:05:14 -0700 Subject: [PATCH 04/13] rename Sorter to TimSort and add a Scala wrapper that is private[spark] --- .../collection/{Sorter.java => TimSort.java} | 21 +++++----- .../util/collection/SortDataFormat.scala | 10 ++++- .../apache/spark/util/collection/Sorter.scala | 39 +++++++++++++++++++ 3 files changed, 59 insertions(+), 11 deletions(-) rename core/src/main/java/org/apache/spark/util/collection/{Sorter.java => TimSort.java} (97%) create mode 100644 core/src/main/scala/org/apache/spark/util/collection/Sorter.scala diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/TimSort.java similarity index 97% rename from core/src/main/java/org/apache/spark/util/collection/Sorter.java rename to core/src/main/java/org/apache/spark/util/collection/TimSort.java index 354459a84710e..409e1a41c5d49 100644 --- a/core/src/main/java/org/apache/spark/util/collection/Sorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/TimSort.java @@ -20,11 +20,13 @@ import java.util.Comparator; /** - * A port of the Android Timsort class, which utilizes a "stable, adaptive, iterative mergesort." + * A port of the Android TimSort class, which utilizes a "stable, adaptive, iterative mergesort." * See the method comment on sort() for more details. * * This has been kept in Java with the original style in order to match very closely with the - * Android source code, and thus be easy to verify correctness. + * Android source code, and thus be easy to verify correctness. The class is package private. We put + * a simple Scala wrapper {@link org.apache.spark.util.collection.Sorter}, which is available to + * package org.apache.spark. * * The purpose of the port is to generalize the interface to the sort to accept input data formats * besides simple arrays where every element is sorted individually. For instance, the AppendOnlyMap @@ -34,8 +36,9 @@ * We allow key reuse to prevent creating many key objects -- see SortDataFormat. * * @see org.apache.spark.util.collection.SortDataFormat + * @see org.apache.spark.util.collection.Sorter */ -class Sorter { +class TimSort { /** * This is the minimum sized sequence that will be merged. Shorter @@ -58,7 +61,7 @@ class Sorter { private final SortDataFormat s; - public Sorter(SortDataFormat sortDataFormat) { + public TimSort(SortDataFormat sortDataFormat) { this.s = sortDataFormat; } @@ -95,7 +98,7 @@ public Sorter(SortDataFormat sortDataFormat) { * * @author Josh Bloch */ - void sort(Buffer a, int lo, int hi, Comparator c) { + public void sort(Buffer a, int lo, int hi, Comparator c) { assert c != null; int nRemaining = hi - lo; @@ -242,16 +245,16 @@ private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator= 0) + while (runHi < hi && c.compare(s.getKey(a, runHi, key0), s.getKey(a, runHi - 1, key1)) >= 0) runHi++; } diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index 99ecb040957bb..ffaddd95395e3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -33,9 +33,13 @@ import scala.reflect.ClassTag * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]). */ // TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity. -private[spark] trait SortDataFormat[K, Buffer] extends Any { +private[spark] +trait SortDataFormat[K, Buffer] extends Any { - /** Creates a new mutable key for reuse. */ + /** + * Creates a new mutable key for reuse. This should be implemented if you want to override + * [[getKey(Buffer, Int, K)]]. + */ protected def newKey(): K = null.asInstanceOf[K] /** Return the sort key for the element at the given index. */ @@ -43,6 +47,8 @@ private[spark] trait SortDataFormat[K, Buffer] extends Any { /** * Returns the sort key for the element at the given index and reuse the input key if possible. + * The default implementation ignores the reuse parameter and invokes [[getKey(Buffer, Int]]. + * If you want to override this method, you must implement [[newKey()]]. */ protected def getKey(data: Buffer, pos: Int, reuse: K): K = { getKey(data, pos) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala b/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala new file mode 100644 index 0000000000000..fe7e63e355b93 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Comparator + +/** + * A simpler wrapper over the Java implementation [[TimSort]]. + * + * The Java implementation is package private, and hence it cannot be called outside package + * org.apache.spark.util.collection. This is a simple wrapper of it that is available to spark. + */ +private[spark] +class Sorter[K, Buffer](private val s: SortDataFormat[K, Buffer]) { + + private val timSort = new TimSort(s) + + /** + * Sorts the input buffer within range [lo, hi). + */ + def sort(a: Buffer, lo: Int, hi: Int, c: Comparator[_ >: K]): Unit = { + timSort.sort(a, lo, hi, c) + } +} From 5f0d53040aa41daaf9f5211f3f5caa48a7f01508 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 25 Oct 2014 20:07:13 -0700 Subject: [PATCH 05/13] update method modifiers of SortDataFormat --- .../util/collection/SortDataFormat.scala | 23 +++++++++---------- .../spark/util/collection/SorterSuite.scala | 12 +++++----- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index ffaddd95395e3..dd8aeee870093 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -40,7 +40,7 @@ trait SortDataFormat[K, Buffer] extends Any { * Creates a new mutable key for reuse. This should be implemented if you want to override * [[getKey(Buffer, Int, K)]]. */ - protected def newKey(): K = null.asInstanceOf[K] + def newKey(): K = null.asInstanceOf[K] /** Return the sort key for the element at the given index. */ protected def getKey(data: Buffer, pos: Int): K @@ -50,27 +50,27 @@ trait SortDataFormat[K, Buffer] extends Any { * The default implementation ignores the reuse parameter and invokes [[getKey(Buffer, Int]]. * If you want to override this method, you must implement [[newKey()]]. */ - protected def getKey(data: Buffer, pos: Int, reuse: K): K = { + def getKey(data: Buffer, pos: Int, reuse: K): K = { getKey(data, pos) } /** Swap two elements. */ - protected def swap(data: Buffer, pos0: Int, pos1: Int): Unit + def swap(data: Buffer, pos0: Int, pos1: Int): Unit /** Copy a single element from src(srcPos) to dst(dstPos). */ - protected def copyElement(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int): Unit + def copyElement(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int): Unit /** * Copy a range of elements starting at src(srcPos) to dst, starting at dstPos. * Overlapping ranges are allowed. */ - protected def copyRange(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int, length: Int): Unit + def copyRange(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int, length: Int): Unit /** * Allocates a Buffer that can hold up to 'length' elements. * All elements of the buffer should be considered invalid until data is explicitly copied in. */ - protected def allocate(length: Int): Buffer + def allocate(length: Int): Buffer } /** @@ -84,9 +84,9 @@ trait SortDataFormat[K, Buffer] extends Any { private[spark] class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, Array[T]] { - override protected def getKey(data: Array[T], pos: Int): K = data(2 * pos).asInstanceOf[K] + override def getKey(data: Array[T], pos: Int): K = data(2 * pos).asInstanceOf[K] - override protected def swap(data: Array[T], pos0: Int, pos1: Int) { + override def swap(data: Array[T], pos0: Int, pos1: Int) { val tmpKey = data(2 * pos0) val tmpVal = data(2 * pos0 + 1) data(2 * pos0) = data(2 * pos1) @@ -95,17 +95,16 @@ class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, data(2 * pos1 + 1) = tmpVal } - override protected def copyElement(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int) { + override def copyElement(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int) { dst(2 * dstPos) = src(2 * srcPos) dst(2 * dstPos + 1) = src(2 * srcPos + 1) } - override protected def copyRange(src: Array[T], srcPos: Int, - dst: Array[T], dstPos: Int, length: Int) { + override def copyRange(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int, length: Int) { System.arraycopy(src, 2 * srcPos, dst, 2 * dstPos, 2 * length) } - override protected def allocate(length: Int): Array[T] = { + override def allocate(length: Int): Array[T] = { new Array[T](2 * length) } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index ea88ae24adf82..600145d962804 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -143,24 +143,24 @@ class SorterSuite extends FunSuite { abstract class AbstractIntArraySortDataFormat[K] extends SortDataFormat[K, Array[Int]] { - override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = { + override def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = { val tmp = data(pos0) data(pos0) = data(pos1) data(pos1) = tmp } - override protected def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) { + override def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) { dst(dstPos) = src(srcPos) } /** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */ - override protected def copyRange(src: Array[Int], srcPos: Int, + override def copyRange(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int, length: Int) { System.arraycopy(src, srcPos, dst, dstPos, length) } /** Allocates a new structure that can hold up to 'length' elements. */ - override protected def allocate(length: Int): Array[Int] = { + override def allocate(length: Int): Array[Int] = { new Array[Int](length) } } @@ -183,11 +183,11 @@ class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] { /** SortDataFormat for Array[Int] with reused keys. */ class KeyReuseIntArraySortDataFormat extends AbstractIntArraySortDataFormat[IntWrapper] { - override protected def newKey(): IntWrapper = { + override def newKey(): IntWrapper = { new IntWrapper() } - override protected def getKey(data: Array[Int], pos: Int, reuse: IntWrapper): IntWrapper = { + override def getKey(data: Array[Int], pos: Int, reuse: IntWrapper): IntWrapper = { if (reuse == null) { new IntWrapper(data(pos)) } else { From 862635666c45edc067907e696797f7a22984317b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 11:43:32 -0700 Subject: [PATCH 06/13] add prepare to timeIt and update testsin SorterSuite --- .../scala/org/apache/spark/util/Utils.scala | 15 +++- .../spark/util/collection/SorterSuite.scala | 77 ++++++++++++++----- 2 files changed, 68 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ccbddd985ae0a..14f345ffff63d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1239,10 +1239,17 @@ private[spark] object Utils extends Logging { * @param numIters number of iterations * @param f function to be executed */ - def timeIt(numIters: Int)(f: => Unit): Long = { - val start = System.currentTimeMillis - times(numIters)(f) - System.currentTimeMillis - start + def timeIt(numIters: Int)(f: => Unit, prepare: => Unit = ()): Long = { + var i = 0 + var sum = 0L + while (i < numIters) { + prepare + val start = System.currentTimeMillis + f + sum += System.currentTimeMillis - start + i += 1 + } + sum } /** diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 600145d962804..ab990125e73e6 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import java.lang.{Float => JFloat} +import java.lang.{Float => JFloat, Integer => JInteger} import java.util.{Arrays, Comparator} import org.scalatest.FunSuite @@ -81,14 +81,14 @@ class SorterSuite extends FunSuite { ignore("Sorter benchmark") { /** Runs an experiment several times. */ - def runExperiment(name: String)(f: => Unit): Unit = { - val firstTry = org.apache.spark.util.Utils.timeIt(1)(f) + def runExperiment(name: String)(f: => Unit, prepare: => Unit): Unit = { + val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, prepare) System.gc() var i = 0 var next10: Long = 0 while (i < 10) { - val time = org.apache.spark.util.Utils.timeIt(1)(f) + val time = org.apache.spark.util.Utils.timeIt(1)(f, prepare) next10 += time println(s"$name: Took $time ms") i += 1 @@ -105,39 +105,76 @@ class SorterSuite extends FunSuite { } // Test our key-value pairs where each element is a Tuple2[Float, Integer) - val kvTupleArray = Array.tabulate[AnyRef](numElements) { i => - (keys(i / 2): Float, i / 2: Int) + val kvTupleArray = new Array[AnyRef](numElements) + val prepareKvTupleArray = () => { + var i = 0 + while (i < numElements) { + kvTupleArray(i) = (keys(i): Float, i: Int) + i += 1 + } } - runExperiment("Tuple-sort using Arrays.sort()") { + runExperiment("Tuple-sort using Arrays.sort()")({ Arrays.sort(kvTupleArray, new Comparator[AnyRef] { override def compare(x: AnyRef, y: AnyRef): Int = Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1) }) - } + }, prepareKvTupleArray) // Test our Sorter where each element alternates between Float and Integer, non-primitive - val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i => - if (i % 2 == 0) keys(i / 2) else new Integer(i / 2) + val keyValueArray = new Array[AnyRef](numElements * 2) + val prepareKeyValueArray = () => { + var i = 0 + while(i < numElements) { + keyValueArray(2 * i) = keys(i) + keyValueArray(2 * i + 1) = new Integer(i) + i += 1 + } } + val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef]) - runExperiment("KV-sort using Sorter") { + runExperiment("KV-sort using Sorter")({ sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] { override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y) }) + }, prepareKeyValueArray) + + // Test primitive and non-primitive sort on int array + + val intArray = Array.fill(numElements)(rand.nextInt()) + + val intObjectArray = new Array[JInteger](numElements) + val prepareIntObjectArray = () => { + var i = 0 + while (i < numElements) { + intObjectArray(i) = intArray(i) + i += 1 + } } - // Test non-primitive sort on float array - runExperiment("Java Arrays.sort()") { - Arrays.sort(keys, new Comparator[JFloat] { - override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y) + runExperiment("Java Arrays.sort() on non-primitive int array")({ + Arrays.sort(intObjectArray, new Comparator[JInteger] { + override def compare(x: JInteger, y: JInteger): Int = x.compareTo(y) }) - } + }, prepareIntObjectArray) - // Test primitive sort on float array - val primitiveKeys = Array.tabulate[Float](numElements) { i => rand.nextFloat() } - runExperiment("Java Arrays.sort() on primitive keys") { - Arrays.sort(primitiveKeys) + val intPrimitiveArray = new Array[Int](numElements) + val prepareIntPrimitiveArray = () = { + System.arraycopy(intArray, 0, intPrimitiveArray, 0, numElements) } + + runExperiment("Java Arrays.sort() on primitive int array")({ + Arrays.sort(intPrimitiveArray) + }, prepareIntPrimitiveArray) + + val sorterWithoutKeyReuse = new Sorter(new IntArraySortDataFormat) + runExperiment("Sorter without key reuse on primitive int array ")({ + sorterWithoutKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[Int]) + }, prepareIntPrimitiveArray) + + val sorterWithKeyReuse = new Sorter(new KeyReuseIntArraySortDataFormat) + runExperiment("Sorter without key resue on primitive int array")({ + sorterWithKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[IntWrapper]) + }, prepareIntPrimitiveArray) } } From 7de2efdb7218b9387b69b12d18c706cf7444f903 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 14:10:15 -0700 Subject: [PATCH 07/13] update the Sorter benchmark code to be correct --- .../scala/org/apache/spark/util/Utils.scala | 3 +- .../spark/util/random/XORShiftRandom.scala | 2 +- .../org/apache/spark/util/UtilsSuite.scala | 11 ++++ .../spark/util/collection/SorterSuite.scala | 64 +++++++++++-------- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 14f345ffff63d..dc1824182c102 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1238,8 +1238,9 @@ private[spark] object Utils extends Logging { * Timing method based on iterations that permit JVM JIT optimization. * @param numIters number of iterations * @param f function to be executed + * @param prepare function to be executed before each call to f. Its running time doesn't count. */ - def timeIt(numIters: Int)(f: => Unit, prepare: => Unit = ()): Long = { + def timeIt(numIters: Int)(f: => Unit, prepare: => Unit = {}): Long = { var i = 0 var sum = 0L while (i < numIters) { diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 55b5713706178..2eeffab56fe13 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -96,7 +96,7 @@ private[spark] object XORShiftRandom { xorRand.nextInt() } - val iters = timeIt(numIters)(_) + val iters: (=> Unit) => Unit = (f) => timeIt(numIters)(f) /* Return results as a map instead of just printing to screen in case the user wants to do something with them */ diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index ea7ef0524d1e1..58026417fe5df 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -351,4 +351,15 @@ class UtilsSuite extends FunSuite { outFile.delete() } } + + test("timeIt with prepare") { + var cnt = 0 + val prepare = () => { + cnt += 1 + Thread.sleep(1000) + } + val time = Utils.timeIt(2)({}, prepare()) + require(cnt === 2, "prepare should be called twice") + require(time < 500, "preparation time should not count") + } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index ab990125e73e6..98f8779281d60 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -81,7 +81,12 @@ class SorterSuite extends FunSuite { ignore("Sorter benchmark") { /** Runs an experiment several times. */ - def runExperiment(name: String)(f: => Unit, prepare: => Unit): Unit = { + def runExperiment(name: String, skip: Boolean = true)(f: => Unit, prepare: => Unit): Unit = { + if (skip) { + println(s"Skiped test $name.") + return + } + val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, prepare) System.gc() @@ -100,53 +105,56 @@ class SorterSuite extends FunSuite { val numElements = 25000000 // 25 mil val rand = new XORShiftRandom(123) - val keys = Array.tabulate[JFloat](numElements) { i => - new JFloat(rand.nextFloat()) + // Test our key-value pairs where each element is a Tuple2[Float, Integer) + val kvTuples = Array.tabulate(numElements) { i => + (rand.nextFloat(), i) } - // Test our key-value pairs where each element is a Tuple2[Float, Integer) val kvTupleArray = new Array[AnyRef](numElements) val prepareKvTupleArray = () => { - var i = 0 - while (i < numElements) { - kvTupleArray(i) = (keys(i): Float, i: Int) - i += 1 - } + System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements) } runExperiment("Tuple-sort using Arrays.sort()")({ Arrays.sort(kvTupleArray, new Comparator[AnyRef] { override def compare(x: AnyRef, y: AnyRef): Int = Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1) }) - }, prepareKvTupleArray) + }, prepareKvTupleArray()) // Test our Sorter where each element alternates between Float and Integer, non-primitive - val keyValueArray = new Array[AnyRef](numElements * 2) - val prepareKeyValueArray = () => { + + val keyValues = { + val data = new Array[AnyRef](numElements * 2) var i = 0 - while(i < numElements) { - keyValueArray(2 * i) = keys(i) - keyValueArray(2 * i + 1) = new Integer(i) + while (i < numElements) { + data(2 * i) = new JFloat(kvTuples(i)._1) + data(2 * i + 1) = new JInteger(kvTuples(i)._2) i += 1 } + data + } + + val keyValueArray = new Array[AnyRef](numElements * 2) + val prepareKeyValueArray = () => { + System.arraycopy(keyValues, 0, keyValueArray, 0, numElements * 2) } val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef]) runExperiment("KV-sort using Sorter")({ - sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] { + sorter.sort(keyValueArray, 0, numElements, new Comparator[JFloat] { override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y) }) - }, prepareKeyValueArray) + }, prepareKeyValueArray()) // Test primitive and non-primitive sort on int array - val intArray = Array.fill(numElements)(rand.nextInt()) + val ints = Array.fill(numElements)(rand.nextInt()) val intObjectArray = new Array[JInteger](numElements) val prepareIntObjectArray = () => { var i = 0 while (i < numElements) { - intObjectArray(i) = intArray(i) + intObjectArray(i) = ints(i) i += 1 } } @@ -155,26 +163,26 @@ class SorterSuite extends FunSuite { Arrays.sort(intObjectArray, new Comparator[JInteger] { override def compare(x: JInteger, y: JInteger): Int = x.compareTo(y) }) - }, prepareIntObjectArray) + }, prepareIntObjectArray()) val intPrimitiveArray = new Array[Int](numElements) - val prepareIntPrimitiveArray = () = { - System.arraycopy(intArray, 0, intPrimitiveArray, 0, numElements) + val prepareIntPrimitiveArray = () => { + System.arraycopy(ints, 0, intPrimitiveArray, 0, numElements) } runExperiment("Java Arrays.sort() on primitive int array")({ Arrays.sort(intPrimitiveArray) - }, prepareIntPrimitiveArray) + }, prepareIntPrimitiveArray()) val sorterWithoutKeyReuse = new Sorter(new IntArraySortDataFormat) - runExperiment("Sorter without key reuse on primitive int array ")({ + runExperiment("Sorter without key reuse on primitive int array")({ sorterWithoutKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[Int]) - }, prepareIntPrimitiveArray) + }, prepareIntPrimitiveArray()) val sorterWithKeyReuse = new Sorter(new KeyReuseIntArraySortDataFormat) - runExperiment("Sorter without key resue on primitive int array")({ + runExperiment("Sorter with key reuse on primitive int array")({ sorterWithKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[IntWrapper]) - }, prepareIntPrimitiveArray) + }, prepareIntPrimitiveArray()) } } @@ -213,7 +221,7 @@ class IntArraySortDataFormat extends AbstractIntArraySortDataFormat[Int] { /** Wrapper of Int for key reuse. */ class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] { override def compare(that: IntWrapper): Int = { - key.compareTo(that.key) + Ordering.Int.compare(key, that.key) } } From 78f2879771910c8c7a40c2391af771a7cb57ee91 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 16:26:39 -0700 Subject: [PATCH 08/13] update tests --- .../spark/util/collection/SorterSuite.scala | 89 +++++++++++-------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 98f8779281d60..43008b1b3f70e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -65,6 +65,28 @@ class SorterSuite extends FunSuite { } } + /** Runs an experiment several times. */ + def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: => Unit): Unit = { + if (skip) { + println(s"Skipped experiment $name.") + return + } + + val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, prepare) + System.gc() + + var i = 0 + var next10: Long = 0 + while (i < 10) { + val time = org.apache.spark.util.Utils.timeIt(1)(f, prepare) + next10 += time + println(s"$name: Took $time ms") + i += 1 + } + + println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)") + } + /** * This provides a simple benchmark for comparing the Sorter with Java internal sorting. * Ideally these would be executed one at a time, each in their own JVM, so their listing @@ -76,38 +98,16 @@ class SorterSuite extends FunSuite { * those, while the Sorter approach can work directly on the input data format. * * Note that the Java implementation varies tremendously between Java 6 and Java 7, when - * the Java sort changed from merge sort to Timsort. + * the Java sort changed from merge sort to TimSort. */ - ignore("Sorter benchmark") { - - /** Runs an experiment several times. */ - def runExperiment(name: String, skip: Boolean = true)(f: => Unit, prepare: => Unit): Unit = { - if (skip) { - println(s"Skiped test $name.") - return - } - - val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, prepare) - System.gc() - - var i = 0 - var next10: Long = 0 - while (i < 10) { - val time = org.apache.spark.util.Utils.timeIt(1)(f, prepare) - next10 += time - println(s"$name: Took $time ms") - i += 1 - } - - println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)") - } - + ignore("Sorter benchmark for key-value pairs") { val numElements = 25000000 // 25 mil val rand = new XORShiftRandom(123) - // Test our key-value pairs where each element is a Tuple2[Float, Integer) + // Test our key-value pairs where each element is a Tuple2[Float, Integer]. + val kvTuples = Array.tabulate(numElements) { i => - (rand.nextFloat(), i) + (new JFloat(rand.nextFloat()), new JInteger(i)) } val kvTupleArray = new Array[AnyRef](numElements) @@ -117,7 +117,7 @@ class SorterSuite extends FunSuite { runExperiment("Tuple-sort using Arrays.sort()")({ Arrays.sort(kvTupleArray, new Comparator[AnyRef] { override def compare(x: AnyRef, y: AnyRef): Int = - Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1) + x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1) }) }, prepareKvTupleArray()) @@ -127,8 +127,8 @@ class SorterSuite extends FunSuite { val data = new Array[AnyRef](numElements * 2) var i = 0 while (i < numElements) { - data(2 * i) = new JFloat(kvTuples(i)._1) - data(2 * i + 1) = new JInteger(kvTuples(i)._2) + data(2 * i) = kvTuples(i)._1 + data(2 * i + 1) = kvTuples(i)._2 i += 1 } data @@ -142,21 +142,34 @@ class SorterSuite extends FunSuite { val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef]) runExperiment("KV-sort using Sorter")({ sorter.sort(keyValueArray, 0, numElements, new Comparator[JFloat] { - override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y) + override def compare(x: JFloat, y: JFloat): Int = x.compareTo(y) }) }, prepareKeyValueArray()) + } - // Test primitive and non-primitive sort on int array + /** + * Tests for sorting with primitive keys with/without key reuse. Java's Arrays.sort is used as + * reference, which is expected to be faster but it can only sort a single array. Sorter can be + * used to sort parallel arrays. + */ + ignore("Sorter benchmark for primitive int array") { + val numElements = 25000000 // 25 mil + val rand = new XORShiftRandom(123) val ints = Array.fill(numElements)(rand.nextInt()) - - val intObjectArray = new Array[JInteger](numElements) - val prepareIntObjectArray = () => { + val intObjects = { + val data = new Array[JInteger](numElements) var i = 0 while (i < numElements) { - intObjectArray(i) = ints(i) + data(i) = new JInteger(ints(i)) i += 1 } + data + } + + val intObjectArray = new Array[JInteger](numElements) + val prepareIntObjectArray = () => { + System.arraycopy(intObjects, 0, intObjectArray, 0, numElements) } runExperiment("Java Arrays.sort() on non-primitive int array")({ @@ -199,8 +212,7 @@ abstract class AbstractIntArraySortDataFormat[K] extends SortDataFormat[K, Array } /** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */ - override def copyRange(src: Array[Int], srcPos: Int, - dst: Array[Int], dstPos: Int, length: Int) { + override def copyRange(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int, length: Int) { System.arraycopy(src, srcPos, dst, dstPos, length) } @@ -220,6 +232,7 @@ class IntArraySortDataFormat extends AbstractIntArraySortDataFormat[Int] { /** Wrapper of Int for key reuse. */ class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] { + override def compare(that: IntWrapper): Int = { Ordering.Int.compare(key, that.key) } From 720f7317ff5267391b099531edab7063876b85c3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 19:01:53 -0700 Subject: [PATCH 09/13] add doc about JIT specialization --- .../org/apache/spark/util/collection/SortDataFormat.scala | 5 +++-- .../org/apache/spark/util/collection/SorterSuite.scala | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index dd8aeee870093..857593afebdaa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -27,14 +27,15 @@ import scala.reflect.ClassTag * Example format: an array of numbers, where each element is also the key. * See [[KVArraySortDataFormat]] for a more exciting format. * - * This trait extends Any to ensure it is universal (and thus compiled to a Java interface). + * Declaring and instantiating multiple subclasses of this class would prevent JIT inlining + * overridden methods and hence decrease the shuffle performance. * * @tparam K Type of the sort key of each element * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]). */ // TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity. private[spark] -trait SortDataFormat[K, Buffer] extends Any { +abstract class SortDataFormat[K, Buffer] extends Any { /** * Creates a new mutable key for reuse. This should be implemented if you want to override diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 43008b1b3f70e..e15fc55ac22d9 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -90,7 +90,8 @@ class SorterSuite extends FunSuite { /** * This provides a simple benchmark for comparing the Sorter with Java internal sorting. * Ideally these would be executed one at a time, each in their own JVM, so their listing - * here is mainly to have the code. + * here is mainly to have the code. Running multiple tests within the same JVM session would + * prevent JIT inlining overridden methods and hence hurt the performance. * * The goal of this code is to sort an array of key-value pairs, where the array physically * has the keys and values alternating. The basic Java sorts work only on the keys, so the @@ -151,6 +152,10 @@ class SorterSuite extends FunSuite { * Tests for sorting with primitive keys with/without key reuse. Java's Arrays.sort is used as * reference, which is expected to be faster but it can only sort a single array. Sorter can be * used to sort parallel arrays. + * + * Ideally these would be executed one at a time, each in their own JVM, so their listing + * here is mainly to have the code. Running multiple tests within the same JVM session would + * prevent JIT inlining overridden methods and hence hurt the performance. */ ignore("Sorter benchmark for primitive int array") { val numElements = 25000000 // 25 mil From 38ba50c6249b90f17282dce1bd7207922c0dd266 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 19:09:34 -0700 Subject: [PATCH 10/13] update timeIt --- .../main/scala/org/apache/spark/util/Utils.scala | 14 +++++++++++++- .../apache/spark/util/random/XORShiftRandom.scala | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index dc1824182c102..273306518b059 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1238,9 +1238,21 @@ private[spark] object Utils extends Logging { * Timing method based on iterations that permit JVM JIT optimization. * @param numIters number of iterations * @param f function to be executed + */ + def timeIt(numIters: Int)(f: => Unit): Long = { + val start = System.currentTimeMillis + times(numIters)(f) + System.currentTimeMillis - start + } + + /** + * Timing method based on iterations that permit JVM JIT optimization. + * @param numIters number of iterations + * @param f function to be executed. For accurate timing, the execution time for each run must be + * an order of magnitude longer than one millisecond. * @param prepare function to be executed before each call to f. Its running time doesn't count. */ - def timeIt(numIters: Int)(f: => Unit, prepare: => Unit = {}): Long = { + def timeIt(numIters: Int)(f: => Unit, prepare: => Unit): Long = { var i = 0 var sum = 0L while (i < numIters) { diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 2eeffab56fe13..55b5713706178 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -96,7 +96,7 @@ private[spark] object XORShiftRandom { xorRand.nextInt() } - val iters: (=> Unit) => Unit = (f) => timeIt(numIters)(f) + val iters = timeIt(numIters)(_) /* Return results as a map instead of just printing to screen in case the user wants to do something with them */ From a72f53c6f1a6f723be16d493e2a7ac61c213a2c0 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 19:31:41 -0700 Subject: [PATCH 11/13] update timeIt --- .../scala/org/apache/spark/util/Utils.scala | 39 ++++++++----------- .../util/collection/SortDataFormat.scala | 2 +- .../spark/util/random/XORShiftRandom.scala | 2 +- .../org/apache/spark/util/UtilsSuite.scala | 2 +- .../spark/util/collection/SorterSuite.scala | 20 +++++----- 5 files changed, 30 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 273306518b059..47d39bf4ae444 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1237,32 +1237,27 @@ private[spark] object Utils extends Logging { /** * Timing method based on iterations that permit JVM JIT optimization. * @param numIters number of iterations - * @param f function to be executed - */ - def timeIt(numIters: Int)(f: => Unit): Long = { - val start = System.currentTimeMillis - times(numIters)(f) - System.currentTimeMillis - start - } - - /** - * Timing method based on iterations that permit JVM JIT optimization. - * @param numIters number of iterations - * @param f function to be executed. For accurate timing, the execution time for each run must be - * an order of magnitude longer than one millisecond. + * @param f function to be executed. If prepare is not None, the running time of each call to f + * must be an order of magnitude longer than one millisecond for accurate timing. * @param prepare function to be executed before each call to f. Its running time doesn't count. */ - def timeIt(numIters: Int)(f: => Unit, prepare: => Unit): Long = { - var i = 0 - var sum = 0L - while (i < numIters) { - prepare + def timeIt(numIters: Int)(f: => Unit, prepare: Option[() => Unit] = None): Long = { + if (prepare.isEmpty) { val start = System.currentTimeMillis - f - sum += System.currentTimeMillis - start - i += 1 + times(numIters)(f) + System.currentTimeMillis - start + } else { + var i = 0 + var sum = 0L + while (i < numIters) { + prepare.get.apply() + val start = System.currentTimeMillis + f + sum += System.currentTimeMillis - start + i += 1 + } + sum } - sum } /** diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index 857593afebdaa..97179e712159a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -35,7 +35,7 @@ import scala.reflect.ClassTag */ // TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity. private[spark] -abstract class SortDataFormat[K, Buffer] extends Any { +abstract class SortDataFormat[K, Buffer] { /** * Creates a new mutable key for reuse. This should be implemented if you want to override diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 55b5713706178..451c0f9406fe5 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -96,7 +96,7 @@ private[spark] object XORShiftRandom { xorRand.nextInt() } - val iters = timeIt(numIters)(_) + val iters: (=> Unit) => Unit = (x) => timeIt(numIters)(x) /* Return results as a map instead of just printing to screen in case the user wants to do something with them */ diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 58026417fe5df..1e911e20523d6 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -358,7 +358,7 @@ class UtilsSuite extends FunSuite { cnt += 1 Thread.sleep(1000) } - val time = Utils.timeIt(2)({}, prepare()) + val time = Utils.timeIt(2)({}, Some(prepare)) require(cnt === 2, "prepare should be called twice") require(time < 500, "preparation time should not count") } diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index e15fc55ac22d9..066d47c46a0d2 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -66,19 +66,19 @@ class SorterSuite extends FunSuite { } /** Runs an experiment several times. */ - def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: => Unit): Unit = { + def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: () => Unit): Unit = { if (skip) { println(s"Skipped experiment $name.") return } - val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, prepare) + val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) System.gc() var i = 0 var next10: Long = 0 while (i < 10) { - val time = org.apache.spark.util.Utils.timeIt(1)(f, prepare) + val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) next10 += time println(s"$name: Took $time ms") i += 1 @@ -120,7 +120,7 @@ class SorterSuite extends FunSuite { override def compare(x: AnyRef, y: AnyRef): Int = x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1) }) - }, prepareKvTupleArray()) + }, prepareKvTupleArray) // Test our Sorter where each element alternates between Float and Integer, non-primitive @@ -145,7 +145,7 @@ class SorterSuite extends FunSuite { sorter.sort(keyValueArray, 0, numElements, new Comparator[JFloat] { override def compare(x: JFloat, y: JFloat): Int = x.compareTo(y) }) - }, prepareKeyValueArray()) + }, prepareKeyValueArray) } /** @@ -157,7 +157,7 @@ class SorterSuite extends FunSuite { * here is mainly to have the code. Running multiple tests within the same JVM session would * prevent JIT inlining overridden methods and hence hurt the performance. */ - ignore("Sorter benchmark for primitive int array") { + test("Sorter benchmark for primitive int array") { val numElements = 25000000 // 25 mil val rand = new XORShiftRandom(123) @@ -181,7 +181,7 @@ class SorterSuite extends FunSuite { Arrays.sort(intObjectArray, new Comparator[JInteger] { override def compare(x: JInteger, y: JInteger): Int = x.compareTo(y) }) - }, prepareIntObjectArray()) + }, prepareIntObjectArray) val intPrimitiveArray = new Array[Int](numElements) val prepareIntPrimitiveArray = () => { @@ -190,17 +190,17 @@ class SorterSuite extends FunSuite { runExperiment("Java Arrays.sort() on primitive int array")({ Arrays.sort(intPrimitiveArray) - }, prepareIntPrimitiveArray()) + }, prepareIntPrimitiveArray) val sorterWithoutKeyReuse = new Sorter(new IntArraySortDataFormat) runExperiment("Sorter without key reuse on primitive int array")({ sorterWithoutKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[Int]) - }, prepareIntPrimitiveArray()) + }, prepareIntPrimitiveArray) val sorterWithKeyReuse = new Sorter(new KeyReuseIntArraySortDataFormat) runExperiment("Sorter with key reuse on primitive int array")({ sorterWithKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[IntWrapper]) - }, prepareIntPrimitiveArray()) + }, prepareIntPrimitiveArray) } } From 0b7b682ea1fad80de08dbf1d63b56d3785f686ec Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 21:39:02 -0700 Subject: [PATCH 12/13] fix mima --- project/MimaExcludes.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c58666af84f24..95152b58e287e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -53,7 +53,9 @@ object MimaExcludes { "org.apache.spark.scheduler.MapStatus"), // TaskContext was promoted to Abstract class ProblemFilters.exclude[AbstractClassProblem]( - "org.apache.spark.TaskContext") + "org.apache.spark.TaskContext"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.util.collection.SortDataFormat") ) ++ Seq( // Adding new methods to the JavaRDDLike trait: ProblemFilters.exclude[MissingMethodProblem]( From d73c3d05a9c5cf529a90b2571086830a7a91f0e9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 27 Oct 2014 22:10:21 -0700 Subject: [PATCH 13/13] address comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 1 + .../org/apache/spark/util/collection/SortDataFormat.scala | 2 +- .../scala/org/apache/spark/util/collection/Sorter.scala | 2 +- .../org/apache/spark/util/random/XORShiftRandom.scala | 8 ++------ 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 47d39bf4ae444..1e1b5b75f6c25 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1240,6 +1240,7 @@ private[spark] object Utils extends Logging { * @param f function to be executed. If prepare is not None, the running time of each call to f * must be an order of magnitude longer than one millisecond for accurate timing. * @param prepare function to be executed before each call to f. Its running time doesn't count. + * @return the total time across all iterations (not couting preparation time) */ def timeIt(numIters: Int)(f: => Unit, prepare: Option[() => Unit] = None): Long = { if (prepare.isEmpty) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index 97179e712159a..4f0bf8384afc9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag * Example format: an array of numbers, where each element is also the key. * See [[KVArraySortDataFormat]] for a more exciting format. * - * Declaring and instantiating multiple subclasses of this class would prevent JIT inlining + * Note: Declaring and instantiating multiple subclasses of this class would prevent JIT inlining * overridden methods and hence decrease the shuffle performance. * * @tparam K Type of the sort key of each element diff --git a/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala b/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala index fe7e63e355b93..39f66b8c428c6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Sorter.scala @@ -20,7 +20,7 @@ package org.apache.spark.util.collection import java.util.Comparator /** - * A simpler wrapper over the Java implementation [[TimSort]]. + * A simple wrapper over the Java implementation [[TimSort]]. * * The Java implementation is package private, and hence it cannot be called outside package * org.apache.spark.util.collection. This is a simple wrapper of it that is available to spark. diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 451c0f9406fe5..467b890fb4bb9 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -96,13 +96,9 @@ private[spark] object XORShiftRandom { xorRand.nextInt() } - val iters: (=> Unit) => Unit = (x) => timeIt(numIters)(x) - /* Return results as a map instead of just printing to screen in case the user wants to do something with them */ - Map("javaTime" -> iters {javaRand.nextInt()}, - "xorTime" -> iters {xorRand.nextInt()}) - + Map("javaTime" -> timeIt(numIters) { javaRand.nextInt() }, + "xorTime" -> timeIt(numIters) { xorRand.nextInt() }) } - }