Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

package org.apache.spark.serializer

import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
import org.roaringbitmap.RoaringBitmap

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
Expand Down Expand Up @@ -94,6 +94,9 @@ class KryoSerializer(conf: SparkConf)
for (cls <- KryoSerializer.toRegister) {
kryo.register(cls)
}
for ((cls, ser) <- KryoSerializer.toRegisterSerializer) {
kryo.register(cls, ser)
}

// For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
Expand Down Expand Up @@ -363,12 +366,6 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand All @@ -377,6 +374,55 @@ private[serializer] object KryoSerializer {
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)

private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = {
bitmap.serialize(new KryoOutputDataOutputBridge(output))
}
override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = {
val ret = new RoaringBitmap
ret.deserialize(new KryoInputDataInputBridge(input))
ret
}
}
)
}

private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
override def readLong(): Long = input.readLong()
override def readChar(): Char = input.readChar()
override def readFloat(): Float = input.readFloat()
override def readByte(): Byte = input.readByte()
override def readShort(): Short = input.readShort()
override def readUTF(): String = input.readString() // readString in kryo does utf8
override def readInt(): Int = input.readInt()
override def readUnsignedShort(): Int = input.readShortUnsigned()
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
override def readFully(b: Array[Byte]): Unit = input.read(b)
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
override def readLine(): String = throw new UnsupportedOperationException("readLine")
override def readBoolean(): Boolean = input.readBoolean()
override def readUnsignedByte(): Int = input.readByteUnsigned()
override def readDouble(): Double = input.readDouble()
}

private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput {
override def writeFloat(v: Float): Unit = output.writeFloat(v)
// There is no "readChars" counterpart, except maybe "readLine", which is not supported
override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars")
override def writeDouble(v: Double): Unit = output.writeDouble(v)
override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8
override def writeShort(v: Int): Unit = output.writeShort(v)
override def writeInt(v: Int): Unit = output.writeInt(v)
override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
override def write(b: Int): Unit = output.write(b)
override def write(b: Array[Byte]): Unit = output.write(b)
override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len)
override def writeBytes(s: String): Unit = output.writeString(s)
override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
override def writeLong(v: Long): Unit = output.writeLong(v)
override def writeByte(v: Int): Unit = output.writeByte(v)
}

/**
Expand Down