diff --git a/core/pom.xml b/core/pom.xml
index 570a25cf325a..5e9e758d72b7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -173,6 +173,10 @@
net.jpountz.lz4
lz4
+
+ org.roaringbitmap
+ RoaringBitmap
+
commons-net
commons-net
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1827e1..42c6788773b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -20,8 +20,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils
+import org.roaringbitmap.RoaringBitmap
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
@@ -121,8 +121,7 @@ private[spark] class CompressedMapStatus(
/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
- * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
- * is compressed.
+ * plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
@@ -132,7 +131,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
- private[this] var emptyBlocks: BitSet,
+ private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {
@@ -145,7 +144,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc
override def getSizeForBlock(reduceId: Int): Long = {
- if (emptyBlocks.get(reduceId)) {
+ if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
@@ -160,7 +159,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
- emptyBlocks = new BitSet
+ emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
@@ -173,18 +172,15 @@ private[spark] object HighlyCompressedMapStatus {
var i = 0
var numNonEmptyBlocks: Int = 0
var totalSize: Long = 0
- // From a compression standpoint, it shouldn't matter whether we track empty or non-empty
- // blocks. From a performance standpoint, we benefit from tracking empty blocks because
- // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
+ val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
- val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
- emptyBlocks.set(i)
+ emptyBlocks.add(i)
}
i += 1
}
@@ -193,6 +189,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
+ emptyBlocks.runOptimize()
+ emptyBlocks.trim()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f2820c..b1e2e8424bce 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -21,24 +21,24 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.ClassTag
-
-import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
-
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
+import org.roaringbitmap._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
@@ -362,6 +362,12 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
+ classOf[RoaringBitmap],
+ classOf[RoaringArray],
+ classOf[Array[Container]],
+ classOf[ArrayContainer],
+ classOf[BitmapContainer],
+ classOf[RunContainer],
classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index b8e466fab450..15c8de61b824 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
+import org.roaringbitmap.RoaringBitmap
import scala.util.Random
@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
val buf = ser.newInstance().serialize(status)
ser.newInstance().deserialize[MapStatus](buf)
}
+
+ test("RoaringBitmap: runOptimize succeeded") {
+ val r = new RoaringBitmap
+ (1 to 200000).foreach(i =>
+ if (i % 200 != 0) {
+ r.add(i)
+ }
+ )
+ val size1 = r.getSizeInBytes
+ val success = r.runOptimize()
+ r.trim()
+ val size2 = r.getSizeInBytes
+ assert(size1 > size2)
+ assert(success)
+ }
+
+ test("RoaringBitmap: runOptimize failed") {
+ val r = new RoaringBitmap
+ (1 to 200000).foreach(i =>
+ if (i % 200 == 0) {
+ r.add(i)
+ }
+ )
+ val size1 = r.getSizeInBytes
+ val success = r.runOptimize()
+ r.trim()
+ val size2 = r.getSizeInBytes
+ assert(size1 === size2)
+ assert(!success)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index afe2e80358ca..ac000f72970c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -29,11 +29,30 @@ import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
+import org.roaringbitmap.RoaringBitmap
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ test("Roaring") {
+ val conf = new SparkConf(false)
+ conf.set("spark.kryo.registrationRequired", "true")
+ val ser = new KryoSerializer(conf).newInstance()
+ def check[T: ClassTag](t: T) {
+ assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
+ val b = new RoaringBitmap()
+ check(b)
+ for (i <- 1 to 1<<16 by 2) {
+ b.add(i)
+ }
+ check(b)
+ b.add(1, 1<<16)
+ b.runOptimize()
+ check(b)
+ }
+
test("SPARK-7392 configuration limits") {
val kryoBufferProperty = "spark.kryoserializer.buffer"
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
diff --git a/pom.xml b/pom.xml
index 4ed1c0c82dee..c965bcb0056a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -623,6 +623,11 @@
+
+ org.roaringbitmap
+ RoaringBitmap
+ 0.5.10
+
commons-net
commons-net