Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-138]: Pluggable serialization for multilang #84

Merged
merged 22 commits into from
May 19, 2014
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1a19f3d
Shell component serialisation is now plugable with a default JSON ser…
Oct 1, 2013
a7918f8
Moved ShellBolt, ShellProcess and ShellSpout back to their old packag…
Oct 7, 2013
3701bfc
Added comments to the new multilang objects.
Oct 7, 2013
3219a4f
multilang serializer is not handled by stormConf. Renamed Emission an…
Oct 8, 2013
ee0678c
multilang serializer is now handled by stormConf. Renamed Emission an…
Oct 8, 2013
06aad3f
Merge branch 'master' of github.com:jsgilmore/storm
Oct 8, 2013
18c0ce7
Merge branch 'master' of https://github.com/nathanmarz/storm
Oct 9, 2013
52948f5
Added newline to NoOutputException
Oct 9, 2013
63afb94
Re-added need_task_ids option
Oct 9, 2013
32b3718
Added json msg and id parsing.
Oct 9, 2013
5699373
Allow for null boltMsg when polling
Oct 9, 2013
16ba0c7
Removed pure whitespace changes of project.clj and bootstrap.clj
Oct 10, 2013
dfbd045
Added some trailing whitespace to Config.java
Oct 10, 2013
84360dd
Merge branch 'master' of https://github.com/nathanmarz/storm
Nov 7, 2013
8030423
Merge branch 'master' of https://github.com/nathanmarz/storm
Nov 18, 2013
5ba0938
Added debugging info to ShellSpout and pulled in latest upstream master.
Nov 18, 2013
8cde550
Added null check for anchors. Log a component's error stream under it…
Nov 29, 2013
46c02be
Merge branch 'master' of https://github.com/nathanmarz/storm
Apr 23, 2014
f5f41a0
Merge branch 'master' of github.com:jsgilmore/incubator-storm
Apr 23, 2014
660d8c4
Made the multilang serialiser topology specific
May 1, 2014
0c669f4
Converted ids to Objects to be compatible with py rb multilangs
May 1, 2014
bcbd22b
Removed sync comment and changed spoutMsg name to _spoutMsg
May 15, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ topology.acker.executors: null
topology.tasks: null
# maximum amount of time a message has to complete before it's considered failed
topology.message.timeout.secs: 30
topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
Expand Down
6 changes: 6 additions & 0 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_DEBUG = "topology.debug";
public static final Object TOPOLOGY_DEBUG_SCHEMA = Boolean.class;

/**
* The serializer for communication between shell components and non-JVM
* processes
*/
public static final String TOPOLOGY_MULTILANG_SERIALIZER = "topology.multilang.serializer";
public static final Object TOPOLOGY_MULTILANG_SERIALIZER_SCHEMA = String.class;

/**
* Whether or not the master should optimize topologies by running multiple
Expand Down
63 changes: 63 additions & 0 deletions storm-core/src/jvm/backtype/storm/multilang/BoltMsg.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package backtype.storm.multilang;

import java.util.List;

/**
* BoltMsg is an object that represents the data sent from a shell component to
* a bolt process that implements a multi-language protocol. It is the union of
* all data types that a bolt can receive from Storm.
*
* <p>
* BoltMsgs are objects sent to the ISerializer interface, for serialization
* according to the wire protocol implemented by the serializer. The BoltMsg
* class allows for a decoupling between the serialized representation of the
* data and the data itself.
* </p>
*/
public class BoltMsg {
private String id;
private String comp;
private String stream;
private long task;
private List<Object> tuple;

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getComp() {
return comp;
}

public void setComp(String comp) {
this.comp = comp;
}

public String getStream() {
return stream;
}

public void setStream(String stream) {
this.stream = stream;
}

public long getTask() {
return task;
}

public void setTask(long task) {
this.task = task;
}

public List<Object> getTuple() {
return tuple;
}

public void setTuple(List<Object> tuple) {
this.tuple = tuple;
}
}
65 changes: 65 additions & 0 deletions storm-core/src/jvm/backtype/storm/multilang/ISerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package backtype.storm.multilang;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;

/**
* The ISerializer interface describes the methods that an object should
* implement to provide serialization and de-serialization capabilities to
* non-JVM language components.
*/
public interface ISerializer extends Serializable {

/**
* This method sets the input and output streams of the serializer
*
* @param processIn output stream to non-JVM component
* @param processOut input stream from non-JVM component
*/
void initialize(OutputStream processIn, InputStream processOut);

/**
* This method transmits the Storm config to the non-JVM process and
* receives its pid.
*
* @param conf storm configuration
* @param context topology context
* @return process pid
*/
Number connect(Map conf, TopologyContext context) throws IOException,
NoOutputException;

/**
* This method receives a shell message from the non-JVM process
*
* @return shell message
*/
ShellMsg readShellMsg() throws IOException, NoOutputException;

/**
* This method sends a bolt message to a non-JVM bolt process
*
* @param msg bolt message
*/
void writeBoltMsg(BoltMsg msg) throws IOException;

/**
* This method sends a spout message to a non-JVM spout process
*
* @param msg spout message
*/
void writeSpoutMsg(SpoutMsg msg) throws IOException;

/**
* This method sends a list of task IDs to a non-JVM bolt process
*
* @param taskIds list of task IDs
*/
void writeTaskIds(List<Integer> taskIds) throws IOException;
}
164 changes: 164 additions & 0 deletions storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package backtype.storm.multilang;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;

/**
* JsonSerializer implements the JSON multilang protocol.
*/
public class JsonSerializer implements ISerializer {
private DataOutputStream processIn;
private BufferedReader processOut;

public void initialize(OutputStream processIn, InputStream processOut) {
this.processIn = new DataOutputStream(processIn);
this.processOut = new BufferedReader(new InputStreamReader(processOut));
}

public Number connect(Map conf, TopologyContext context)
throws IOException, NoOutputException {
JSONObject setupInfo = new JSONObject();
setupInfo.put("pidDir", context.getPIDDir());
setupInfo.put("conf", conf);
setupInfo.put("context", context);
writeMessage(setupInfo);

Number pid = (Number) ((JSONObject) readMessage()).get("pid");
return pid;
}

public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
JSONObject obj = new JSONObject();
obj.put("id", boltMsg.getId());
obj.put("comp", boltMsg.getComp());
obj.put("stream", boltMsg.getStream());
obj.put("task", boltMsg.getTask());
obj.put("tuple", boltMsg.getTuple());
writeMessage(obj);
}

public void writeSpoutMsg(SpoutMsg msg) throws IOException {
JSONObject obj = new JSONObject();
obj.put("command", msg.getCommand());
obj.put("id", msg.getId());
writeMessage(obj);
}

public void writeTaskIds(List<Integer> taskIds) throws IOException {
writeMessage(taskIds);
}

private void writeMessage(Object msg) throws IOException {
writeString(JSONValue.toJSONString(msg));
}

private void writeString(String str) throws IOException {
byte[] strBytes = str.getBytes("UTF-8");
processIn.write(strBytes, 0, strBytes.length);
processIn.writeBytes("\nend\n");
processIn.flush();
}

public ShellMsg readShellMsg() throws IOException, NoOutputException {
JSONObject msg = (JSONObject) readMessage();
ShellMsg shellMsg = new ShellMsg();

String command = (String) msg.get("command");
shellMsg.setCommand(command);

Object id = msg.get("id");
shellMsg.setId(id);

String log = (String) msg.get("msg");
shellMsg.setMsg(log);

String stream = (String) msg.get("stream");
if (stream == null)
stream = Utils.DEFAULT_STREAM_ID;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: Can we put this in a block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I don't understand what you mean with "block" here? Do you mean functional block {}?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant. Not a big deal though.

shellMsg.setStream(stream);

Object taskObj = msg.get("task");
if (taskObj != null) {
shellMsg.setTask((Long) taskObj);
} else {
shellMsg.setTask(0);
}

Object need_task_ids = msg.get("need_task_ids");
if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
shellMsg.setNeedTaskIds(true);
} else {
shellMsg.setNeedTaskIds(false);
}

shellMsg.setTuple((List) msg.get("tuple"));

List<Tuple> anchors = new ArrayList<Tuple>();
Object anchorObj = msg.get("anchors");
if (anchorObj != null) {
if (anchorObj instanceof String) {
anchorObj = Arrays.asList(anchorObj);
}
for (Object o : (List) anchorObj) {
shellMsg.addAnchor((String) o);
}
}

return shellMsg;
}

private Object readMessage() throws IOException, NoOutputException {
String string = readString();
Object msg = JSONValue.parse(string);
if (msg != null) {
return msg;
} else {
throw new IOException("unable to parse: " + string);
}
}

private String readString() throws IOException, NoOutputException {
StringBuilder line = new StringBuilder();

// synchronized (processOut) {
while (true) {
String subline = processOut.readLine();
if (subline == null) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Pipe to subprocess seems to be broken!");
if (line.length() == 0) {
errorMessage.append(" No output read.\n");
} else {
errorMessage.append(" Currently read output: "
+ line.toString() + "\n");
}
errorMessage.append("Serializer Exception:\n");
throw new NoOutputException(errorMessage.toString());
}
if (subline.equals("end")) {
break;
}
if (line.length() != 0) {
line.append("\n");
}
line.append(subline);
}
// }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented synchronized can be removed if it is not needed. Looks like it may have come from other code that already had it commented out.

return line.toString();
}
}
23 changes: 23 additions & 0 deletions storm-core/src/jvm/backtype/storm/multilang/NoOutputException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package backtype.storm.multilang;

/**
* A NoOutputException states that no data has been received from the connected
* non-JVM process.
*/
public class NoOutputException extends Exception {
public NoOutputException() {
super();
}

public NoOutputException(String message) {
super(message);
}

public NoOutputException(String message, Throwable cause) {
super(message, cause);
}

public NoOutputException(Throwable cause) {
super(cause);
}
}
Loading