Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();

if (_spouts.size() == 0) {
throw new IllegalArgumentException("Spouts is not set");
}

for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
Expand Down Expand Up @@ -179,8 +184,13 @@ public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentExc
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);

if (_bolts.containsKey(id)) {
throw new IllegalArgumentException("Duplicate bolt id found " + id);
} else {
_bolts.put(id, bolt);
return new BoltGetter(id);
}
}

/**
Expand Down Expand Up @@ -339,8 +349,13 @@ public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumen
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, spout, parallelism_hint);
_spouts.put(id, spout);
return new SpoutGetter(id);

if (_spouts.containsKey(id)) {
throw new IllegalArgumentException("Duplicate spout id found " + id);
} else {
_spouts.put(id, spout);
return new SpoutGetter(id);
}
}

public void setStateSpout(String id, IRichStateSpout stateSpout) throws IllegalArgumentException {
Expand Down