diff --git a/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java b/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java index 5627c5201ce..5ec88209706 100644 --- a/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java +++ b/storm-core/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java @@ -22,6 +22,7 @@ import org.apache.storm.serialization.KryoTupleDeserializer; import clojure.lang.IFn; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; import java.util.List; @@ -53,7 +54,10 @@ public void recv(List batch) { KryoTupleDeserializer des = _des.get(); ArrayList ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { - ret.add(new AddressedTuple(message.task(), des.deserialize(message.message()))); + Tuple tuple = des.deserialize(message.message()); + if (tuple != null) { + ret.add(new AddressedTuple(message.task(), tuple)); + } } _cb.invoke(ret); } diff --git a/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java b/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java index 4e877a8c00b..62e2014b525 100644 --- a/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java +++ b/storm-core/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java @@ -22,11 +22,15 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import com.esotericsoftware.kryo.io.Input; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.List; import java.util.Map; public class KryoTupleDeserializer implements ITupleDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(KryoTupleDeserializer.class); GeneralTopologyContext _context; KryoValuesDeserializer _kryo; SerializationFactory.IdDictionary _ids; @@ -50,7 +54,8 @@ public Tuple deserialize(byte[] ser) { List values = _kryo.deserializeFrom(_kryoInput); return new TupleImpl(_context, values, taskId, streamName, id); } catch(IOException e) { - throw new RuntimeException(e); + LOG.error("Failed to deserialize tuple.", e); + return null; } } }