Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils
import org.roaringbitmap.RoaringBitmap

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
Expand Down Expand Up @@ -121,8 +121,7 @@ private[spark] class CompressedMapStatus(

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
* plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
Expand All @@ -132,7 +131,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -145,7 +144,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.get(reduceId)) {
if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
Expand All @@ -160,7 +159,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new BitSet
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
Expand All @@ -173,18 +172,15 @@ private[spark] object HighlyCompressedMapStatus {
var i = 0
var numNonEmptyBlocks: Int = 0
var totalSize: Long = 0
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment still make sense, could you keep it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.set(i)
emptyBlocks.add(i)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use RoaringBitmap and the RoaringBitmap objects are not expect to change often after this loop, a call such as emptyBlocks.runOptimize();nonEmptyBlocks.runOptimize(); might be warranted. It should be checked.

Though it should not help, you can also investigate whether adding emptyBlocks.trim();nonEmptyBlocks.trim(); can be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
Would this also eliminate the need to even bother w/ both emptyBlocks and nonEmptyBlocks? Eg., after emptyBlocks.runOptimize its just as good as storing the empty blocks? After this point, we only care about the memory used and the time it takes to call contains -- these are totally immutable after this.

Does it also make sense to call runOptimize periodically as these are being built, to avoid too much memory being used? Say the upper end of the size of these is ~100k. So the worst case would be storing 100k shorts, or ~200KB, before we call runOptimize? That isn't really too bad, so I'm inclined to keep things simple, but just thought it was worth thinking about this now while we're looking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not exactly sure why you would keep track of both the empty and non-empty blocks. Seems redundant. But maybe I misunderstand. But the comment above is probably right: you'd want to minimize the number of bitmap insertions.

By design, even without runOptimize, RoaringBitmap should not use more than 16 bits per value asymptotically. (Meaning that if you have just a few values, they will end up using more than 16 bits, but if you have thousands... then they should use 16 bits or less.) So while you cannot be sure that a RoaringBitmap will "compress well" for some meaning of "compress well", you can pretty much bound the memory usage with respect to totalNumBlocks. So for 100k possible values to be added, that would be 200kB... i.e., it would still fit in L2 cache. And that's an upper bound. Asympotically you are also sure not to use more memory than a BitSet with its wordsInUse set to the size of the universe, for a huge universe size, so if 100k is totalNumBlocks then you neither a RoaringBitmap or a BitSet should use more than 12kB.

So my guess is that a single call to runOptimize at the end would be enough...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add that even with BitSet, you'd probably want to call trimToSize() after the BitSets are constructed since, like RoaringBitmap, there are underlying "dynamic" arrays that have a capacity that can exceed the actual data size. (This often makes little difference statistically however.)

i += 1
}
Expand All @@ -193,6 +189,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
emptyBlocks.runOptimize()
emptyBlocks.trim()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.roaringbitmap._

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update your import rules to place scale package before third parties (as before this PR)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

import scala.reflect.ClassTag

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down Expand Up @@ -362,6 +362,12 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[Array[Container]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[RunContainer],
classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
import org.roaringbitmap.RoaringBitmap

import scala.util.Random

Expand Down Expand Up @@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
val buf = ser.newInstance().serialize(status)
ser.newInstance().deserialize[MapStatus](buf)
}

test("RoaringBitmap: runOptimize succeeded") {
val r = new RoaringBitmap
(1 to 200000).foreach(i =>
if (i % 200 != 0) {
r.add(i)
}
)
val size1 = r.getSizeInBytes
val success = r.runOptimize()
r.trim()
val size2 = r.getSizeInBytes
assert(size1 > size2)
assert(success)
}

test("RoaringBitmap: runOptimize failed") {
val r = new RoaringBitmap
(1 to 200000).foreach(i =>
if (i % 200 == 0) {
r.add(i)
}
)
val size1 = r.getSizeInBytes
val success = r.runOptimize()
r.trim()
val size2 = r.getSizeInBytes
assert(size1 === size2)
assert(!success)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,30 @@ import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
import org.roaringbitmap.RoaringBitmap

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)

test("Roaring") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
val b = new RoaringBitmap()
check(b)
for (i <- 1 to 1<<16 by 2) {
b.add(i)
}
check(b)
b.add(1, 1<<16)
b.runOptimize()
check(b)
}

test("SPARK-7392 configuration limits") {
val kryoBufferProperty = "spark.kryoserializer.buffer"
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.5.10</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down