Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,32 @@ public class ProtobufSerializer extends Serializer<Message> {
* classes in play, which should not be very large.
* We can replace with a LRU if we start to see any issues.
*/
// Cache for the `parseFrom(byte[] bytes)` method
final protected HashMap<Class, Method> methodCache = new HashMap<Class, Method>();
// Cache for the `getDefaultInstance()` method
final protected HashMap<Class, Method> defaultInstanceMethodCache = new HashMap<Class, Method>();

/**
* This is slow, so we should cache to avoid killing perf:
* See: http://www.jguru.com/faq/view.jsp?EID=246569
*/
protected Method getParse(Class cls) throws Exception {
Method meth = methodCache.get(cls);
private Method getMethodFromCache(Class cls, HashMap<Class, Method> cache, String methodName, Class... parameterTypes) throws Exception {
Method meth = cache.get(cls);
if (null == meth) {
meth = cls.getMethod("parseFrom", new Class[]{ byte[].class });
methodCache.put(cls, meth);
meth = cls.getMethod(methodName, parameterTypes);
cache.put(cls, meth);
}
return meth;
}

protected Method getParse(Class cls) throws Exception {
return getMethodFromCache(cls, methodCache, "parseFrom", byte[].class);
}

protected Method getDefaultInstance(Class cls) throws Exception {
return getMethodFromCache(cls, defaultInstanceMethodCache, "getDefaultInstance");
}

@Override
public void write(Kryo kryo, Output output, Message mes) {
byte[] ser = mes.toByteArray();
Expand All @@ -69,6 +80,9 @@ public void write(Kryo kryo, Output output, Message mes) {
public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@regadas Wanted to get your thoughts on handling null in the serializer.

I was thinking maybe we could serialize nulls to a magic number (-1) which would look like:

@Override
public void write(Kryo kryo, Output output, Message mes) {
    if (mes == null) {
        output.writeInt(-1, true);
    }
    ...
}

and

@Override
public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
    try {      
        int size = input.readInt(true);
        if (size == -1) {
            return null;
        }
        ...
    } catch (Exception e) {
        throw new RuntimeException("Could not create " + pbClass, e); 
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since kryo dispatches serialization based on the .getClass I think this code will never really be called on null in practice.

Can you show a test that fails now just using Kyro not the serializer directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnynek you're right, I cannot get the test to fail inside the serializer with null. However, the com.esotericsoftware.kryo.Serializer base class has a acceptsNull field that's injected through the constructor. Hence, depending on how it's initialized, the serializer might get a null object?

try {
int size = input.readInt(true);
if (size == 0) {
return (Message) getDefaultInstance(pbClass).invoke(null);
}
byte[] barr = new byte[size];
input.readBytes(barr);
return (Message)getParse(pbClass).invoke(null, barr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class ProtobufTest extends AnyWordSpec with Matchers {
def buildKyroPoolWithProtoSer(): KryoPool =
KryoPool.withByteArrayOutputStream(
1,
new KryoInstantiator {
override def newKryo(): Kryo = {
val k = new Kryo
k.addDefaultSerializer(classOf[Message], classOf[ProtobufSerializer])
k
}
}
)

def buildFatigueCount(target: Long, id: Long, count: Int, recentClicks: List[Long]): FatigueCount = {
val bldr = FatigueCount
.newBuilder()
Expand All @@ -39,16 +51,7 @@ class ProtobufTest extends AnyWordSpec with Matchers {
}

"Protobuf round-trips" in {
val kpool = KryoPool.withByteArrayOutputStream(
1,
new KryoInstantiator {
override def newKryo(): Kryo = {
val k = new Kryo
k.addDefaultSerializer(classOf[Message], classOf[ProtobufSerializer])
k
}
}
)
val kpool = buildKyroPoolWithProtoSer()

kpool.deepCopy(buildFatigueCount(12L, -1L, 42, List(1L, 2L))) should equal(
buildFatigueCount(12L, -1L, 42, List(1L, 2L))
Expand All @@ -58,4 +61,10 @@ class ProtobufTest extends AnyWordSpec with Matchers {
val kpoolBusted = KryoPool.withByteArrayOutputStream(1, new KryoInstantiator)
an[Exception] should be thrownBy kpoolBusted.deepCopy(buildFatigueCount(12L, -1L, 42, List(1L, 2L)))
}

"Default Instance of Should be Ser-DeSer correctly" in {
val kpool = buildKyroPoolWithProtoSer()

kpool.deepCopy(FatigueCount.getDefaultInstance) should equal(FatigueCount.getDefaultInstance)
}
}