Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.storm.serialization.KryoTupleDeserializer;

import clojure.lang.IFn;
import org.apache.storm.tuple.Tuple;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -53,7 +54,10 @@ public void recv(List<TaskMessage> batch) {
KryoTupleDeserializer des = _des.get();
ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
for (TaskMessage message: batch) {
ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
Tuple tuple = des.deserialize(message.message());
if (tuple != null) {
ret.add(new AddressedTuple(message.task(), tuple));
}
}
_cb.invoke(ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import com.esotericsoftware.kryo.io.Input;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class KryoTupleDeserializer implements ITupleDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(KryoTupleDeserializer.class);
GeneralTopologyContext _context;
KryoValuesDeserializer _kryo;
SerializationFactory.IdDictionary _ids;
Expand All @@ -50,7 +54,8 @@ public Tuple deserialize(byte[] ser) {
List<Object> values = _kryo.deserializeFrom(_kryoInput);
return new TupleImpl(_context, values, taskId, streamName, id);
} catch(IOException e) {
throw new RuntimeException(e);
LOG.error("Failed to deserialize tuple.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storm is designed to be a fail fast system, this fix appears to be doing the opposite of that. If we cannot deserialize a tuple we need to make it very very clear to the user that something bad is happening. Sadly in my experience even crashing the worker is not always enough, but in most cases it is. Can we do a System.exit after logging the error instead of returning null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case this happens because storm's internal message exchange protocol is vulnerable to foreign attack. If deserialization fails, you should warn the user and go on with processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need to secure the connection against attach and not mask real errors.

return null;
}
}
}