From 556ab52fdf9cffe39994cb601f5103d1219c4052 Mon Sep 17 00:00:00 2001 From: jinhong-lu Date: Wed, 24 Feb 2016 14:49:09 +0800 Subject: [PATCH 1/3] fix storm-1065: kafka-partition can not find leader in zookeeper --- .../org/apache/storm/kafka/KafkaUtils.java | 46 +++++++++++++++++++ .../apache/storm/kafka/PartitionManager.java | 7 +++ .../org/apache/storm/kafka/ZkCoordinator.java | 5 ++ 3 files changed, 58 insertions(+) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index a2be825a326..62103e1879d 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -24,6 +24,12 @@ import org.apache.storm.kafka.trident.StaticBrokerReader; import org.apache.storm.kafka.trident.ZkBrokerReader; import org.apache.storm.metric.api.IMetric; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +55,7 @@ public class KafkaUtils { public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); private static final int NO_OFFSET = -5; + private static CuratorFramework _curator; public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) { @@ -278,4 +285,43 @@ private static void logPartitionMapping(int totalTasks, int taskIndex, List value = (Map) JSONValue.parse(new String(hostPortData, "UTF-8")); + Integer leader = ((Number) value.get("leader")).intValue(); + if (leader == -1) { + return false; + } else { + return true; + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index dbf70a0a9a2..30c48cc4b32 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -59,6 +59,7 @@ public class PartitionManager { DynamicPartitionConnections _connections; ZkState _state; Map _stormConf; + Boolean _isActive = true; long numberFailed, numberAcked; public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; @@ -87,6 +88,12 @@ public PartitionManager(DynamicPartitionConnections connections, String topology } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } + + + //if leader == -1, set the manager to inactive. + if (!KafkaUtils.checkLeader(stormConf, spoutConfig, id)) { + _isActive = false; + } String topic = _partition.topic; Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index a53d5660a04..dfc096a0284 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -97,6 +97,11 @@ public void refresh() { for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); + if (man._isActive) { + _managers.put(id, man); + }else { + LOG.warn("No leader found for partition " + id.topic + " : " + id.partition); + } } } catch (Exception e) { From 536beeec2bc100470669e679e3ba265647fa654b Mon Sep 17 00:00:00 2001 From: jinhong-lu Date: Fri, 26 Feb 2016 14:44:03 +0800 Subject: [PATCH 2/3] a simple way to fix storm-1065 --- .../storm/kafka/DynamicBrokersReader.java | 5 +-- .../org/apache/storm/kafka/KafkaUtils.java | 38 ------------------- .../apache/storm/kafka/PartitionManager.java | 6 --- .../org/apache/storm/kafka/ZkCoordinator.java | 2 +- ...OpaquePartitionedTridentSpoutExecutor.java | 1 + 5 files changed, 3 insertions(+), 49 deletions(-) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java index 0fc85b3e34e..5b1c2002ba2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java @@ -156,15 +156,12 @@ public String brokerPath() { * @param partition * @return */ - private int getLeaderFor(String topic, long partition) { + public int getLeaderFor(String topic, long partition) { try { String topicBrokersPath = partitionPath(topic); byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state"); Map value = (Map) JSONValue.parse(new String(hostPortData, "UTF-8")); Integer leader = ((Number) value.get("leader")).intValue(); - if (leader == -1) { - throw new RuntimeException("No leader found for partition " + partition); - } return leader; } catch (RuntimeException e) { throw e; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 62103e1879d..01b8361185e 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -55,7 +55,6 @@ public class KafkaUtils { public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); private static final int NO_OFFSET = -5; - private static CuratorFramework _curator; public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) { @@ -286,42 +285,5 @@ public static String taskId(int taskIndex, int totalTasks) { return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] "; } - public static Boolean checkLeader(Map conf, SpoutConfig spoutConfig, Partition id) { - - String topicBrokersPath = ""; - String zkStr = ""; - - if (spoutConfig.hosts instanceof ZkHosts) { - ZkHosts hosts = (ZkHosts) spoutConfig.hosts; - topicBrokersPath = hosts.brokerZkPath + "/topics/" + id.topic + "/partitions"; - zkStr = hosts.brokerZkStr; - } - try { - _curator = CuratorFrameworkFactory.newClient(zkStr, - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), - new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); - _curator.start(); - } catch (Exception ex) { - LOG.error("Couldn't connect to zookeeper", ex); - throw new RuntimeException(ex); - } - - try { - byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + id.partition + "/state"); - Map value = (Map) JSONValue.parse(new String(hostPortData, "UTF-8")); - Integer leader = ((Number) value.get("leader")).intValue(); - if (leader == -1) { - return false; - } else { - return true; - } - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 30c48cc4b32..36687098caf 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -59,7 +59,6 @@ public class PartitionManager { DynamicPartitionConnections _connections; ZkState _state; Map _stormConf; - Boolean _isActive = true; long numberFailed, numberAcked; public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; @@ -89,11 +88,6 @@ public PartitionManager(DynamicPartitionConnections connections, String topology LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } - - //if leader == -1, set the manager to inactive. - if (!KafkaUtils.checkLeader(stormConf, spoutConfig, id)) { - _isActive = false; - } String topic = _partition.topic; Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index dfc096a0284..12a82432e20 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -97,7 +97,7 @@ public void refresh() { for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); - if (man._isActive) { + if(_reader.getLeaderFor(id.topic, id.partition) != -1) { _managers.put(id, man); }else { LOG.warn("No leader found for partition " + id.topic + " : " + id.partition); diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java index 7c43961b7d9..9e530e94fe2 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java @@ -98,6 +98,7 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl List partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); List myPartitions = new ArrayList<>(); + //计算自己这个task负责哪些分区 for(int i=_index; i < partitions.size(); i+=_numTasks) { ISpoutPartition p = partitions.get(i); String id = p.getId(); From 58f7fcbd885f7299ad49e4a0858dc210db8a83ed Mon Sep 17 00:00:00 2001 From: jinhong-lu Date: Fri, 26 Feb 2016 14:49:26 +0800 Subject: [PATCH 3/3] a simple way to fix storm-1065 --- .../src/jvm/org/apache/storm/kafka/KafkaUtils.java | 8 -------- .../src/jvm/org/apache/storm/kafka/PartitionManager.java | 1 - .../spout/OpaquePartitionedTridentSpoutExecutor.java | 1 - 3 files changed, 10 deletions(-) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 01b8361185e..a2be825a326 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -24,12 +24,6 @@ import org.apache.storm.kafka.trident.StaticBrokerReader; import org.apache.storm.kafka.trident.ZkBrokerReader; import org.apache.storm.metric.api.IMetric; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; -import org.apache.storm.Config; -import org.apache.storm.utils.Utils; -import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -284,6 +278,4 @@ private static void logPartitionMapping(int totalTasks, int taskIndex, List partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); List myPartitions = new ArrayList<>(); - //计算自己这个task负责哪些分区 for(int i=_index; i < partitions.size(); i+=_numTasks) { ISpoutPartition p = partitions.get(i); String id = p.getId();