Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class TridentTopology {
final DefaultDirectedGraph<Node, IndexedEdge> _graph;
final Map<String, List<Node>> _colocate;
final UniqueIdGen _gen;
private final static String DRPC_DEFAULT_FILED = "args";


public TridentTopology() {
this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
Expand Down Expand Up @@ -145,27 +147,35 @@ public Stream newStream(String txId, ITridentDataSource dataSource) {
}

public Stream newDRPCStream(String function) {
return newDRPCStream(new DRPCSpout(function));
return newDRPCStream (new DRPCSpout(function), DRPC_DEFAULT_FILED);
}


public Stream newDRPCStream(String function, String field) {
return newDRPCStream (new DRPCSpout(function), field);
}

public Stream newDRPCStream(String function, ILocalDRPC server) {
return newDRPCStream (function, server, DRPC_DEFAULT_FILED);
}

public Stream newDRPCStream(String function, ILocalDRPC server, String field) {
DRPCSpout spout;
if(server==null) {
spout = new DRPCSpout(function);
} else {
spout = new DRPCSpout(function, server);
}
return newDRPCStream(spout);
return newDRPCStream (spout, field);
}

private Stream newDRPCStream(DRPCSpout spout) {
private Stream newDRPCStream(DRPCSpout spout, String field) {
// TODO: consider adding a shuffle grouping after the spout to avoid so much routing of the args/return-info all over the place
// (at least until its possible to just pack bolt logic into the spout itself)

Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.DRPC);
Stream nextStream = addNode(n);
// later on, this will be joined back with return-info and all the results
return nextStream.project(new Fields("args"));
return nextStream.project(new Fields(field));
}

public TridentState newStaticState(StateFactory factory) {
Expand Down