diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java index 0fc85b3e34e..5b1c2002ba2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java @@ -156,15 +156,12 @@ public String brokerPath() { * @param partition * @return */ - private int getLeaderFor(String topic, long partition) { + public int getLeaderFor(String topic, long partition) { try { String topicBrokersPath = partitionPath(topic); byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state"); Map value = (Map) JSONValue.parse(new String(hostPortData, "UTF-8")); Integer leader = ((Number) value.get("leader")).intValue(); - if (leader == -1) { - throw new RuntimeException("No leader found for partition " + partition); - } return leader; } catch (RuntimeException e) { throw e; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index a53d5660a04..12a82432e20 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -97,6 +97,11 @@ public void refresh() { for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); + if(_reader.getLeaderFor(id.topic, id.partition) != -1) { + _managers.put(id, man); + }else { + LOG.warn("No leader found for partition " + id.topic + " : " + id.partition); + } } } catch (Exception e) {