diff --git a/.gitignore b/.gitignore index 08b217afd23..5d7641db7c3 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ _site dependency-reduced-pom.xml derby.log metastore_db +*.versionsBackup .settings/ .project .classpath diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index 5a34b55baa7..efb6a9ca4de 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -52,17 +52,24 @@ The optional ClientId is used as a part of the ZooKeeper path where the spout's There are 2 extensions of KafkaConfig currently in use. Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling -behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely -identify your spout. +behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset if you chose ZooKeeper as the storage. +The id should uniquely identify your spout. ```java -public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); public SpoutConfig(BrokerHosts hosts, String topic, String id); +public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); ``` In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves: ```java - // setting for how often to save the current Kafka offset to ZooKeeper + // setting for how often to save the current kafka offset public long stateUpdateIntervalMs = 2000; + // offset state information storage. validate options are zookeeper and kafka + public String stateStore = "zookeeper"; + // timeout in millis for state read/write operations + public int stateOpTimeout = 5000; + // max retries allowed for state read/write operations + public int stateOpMaxRetry = 3; + // Exponential back-off retry settings. These are used when retrying messages after a bolt // calls OutputCollector.fail(). // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent @@ -84,6 +91,8 @@ The KafkaConfig class also has bunch of public variables that controls your appl public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); + public boolean ignoreStoredOffsets = false; + // ignoreZkOffsets is now deprecated. Although it is still honored now, you should change to use ignoreStoredOffsets. public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; @@ -148,24 +157,31 @@ As shown in the above KafkaConfig properties, you can control from where in the setting `KafkaConfig.startOffsetTime` as follows: 1. `kafka.api.OffsetRequest.EarliestTime()`: read from the beginning of the topic (i.e. from the oldest messages onwards) -2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messsages that are being written to the topic) +2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messages that are being written to the topic) 3. A Unix timestamp aka seconds since the epoch (e.g. via `System.currentTimeMillis()`): see [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) in the Kafka FAQ -As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information -under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`. In the case of failures it recovers from the last -written offset in ZooKeeper. +As the topology runs the Kafka spout keeps track of the offsets it has read and emitted. Kafka spout offers two built-in options for offset storage which +can be configured by setting `SpoutConfig.stateStore`. By default, the `zookeeper` option is chosen which stores offset state information +under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`. The second option is `kafka` which stores offset state information using +Kafka's built-in offset management API. In addition, you may supply your own custom state store implementation by providing the full class name of your +implementation. The custom state store must implement the storm.kafka.StateStore interface and must have a public constructor that takes two arguments as +```java + public MyStateStore(Map stormConf, SpoutConfig spoutConfig) +``` + +In the case of failures Kafka spout recovers from the last written offset. -> **Important:** When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` and `SpoutConfig.id` -> were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the -> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case. +> **Important:** When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` (if `zookeeper` is chosen as storage option) +> and `SpoutConfig.id` were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the +> offsets) from storage -- which may lead to unexpected behavior and/or to data loss, depending on your use case. This means that when a topology has run once the setting `KafkaConfig.startOffsetTime` will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in -ZooKeeper to determine from where it should begin (more precisely: resume) reading. -If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should -set the parameter `KafkaConfig.ignoreZkOffsets` to `true`. If `true`, the spout will always begin reading from the -offset defined by `KafkaConfig.startOffsetTime` as described above. +storage to determine from where it should begin (more precisely: resume) reading. +If you want to force the spout to ignore any consumer state information stored in storage, then you should +set the parameter `KafkaConfig.ignoreStoredOffsets` to `true` (`KafkaConfig.ignoreZkOffsets` can be used as alias for backward compatibility). +If `true`, the spout will always begin reading from the offset defined by `KafkaConfig.startOffsetTime` as described above. ## Using storm-kafka with different versions of Scala diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 43b7796c597..502604fc02f 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -153,5 +153,34 @@ com.googlecode.json-simple json-simple + + + com.yammer.metrics + metrics-core + 2.2.0 + test + + + + com.101tec + zkclient + 0.5 + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.testng + testng + + + test + + diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java index 0fc85b3e34e..2ad90daa2c1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java @@ -37,7 +37,7 @@ public class DynamicBrokersReader { - public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class); + private static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class); private CuratorFramework _curator; private String _zkPath; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java index 6d30139d22c..19f04527883 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java @@ -30,7 +30,7 @@ public class DynamicPartitionConnections { - public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class); + private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class); static class ConnectionInfo { SimpleConsumer consumer; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java index e1e1d242cae..f611197b356 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java @@ -34,12 +34,22 @@ public class KafkaConfig implements Serializable { public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); - public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; + /** + * Whether the spout should ignore the previously stored offsets when it starts. + */ + public boolean ignoreStoredOffsets = false; + + /** + * @deprecated + * This parameter is deprecated now. Please use {@link KafkaConfig#ignoreStoredOffsets} instead. + */ + public boolean ignoreZkOffsets = false; + public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index d1da446347b..e41fd4410fe 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -18,8 +18,7 @@ package org.apache.storm.kafka; import com.google.common.base.Strings; - -import org.apache.storm.Config; +import kafka.message.Message; import org.apache.storm.kafka.PartitionManager.KafkaMessageId; import org.apache.storm.metric.api.IMetric; import org.apache.storm.spout.SpoutOutputCollector; @@ -29,27 +28,41 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; // TODO: need to add blacklisting // TODO: need to make a best effort to not re-emit messages if don't have to public class KafkaSpout extends BaseRichSpout { - static enum EmitState { + + public static class MessageAndRealOffset { + public Message msg; + public long offset; + + public MessageAndRealOffset(Message msg, long offset) { + this.msg = msg; + this.offset = offset; + } + } + + enum EmitState { EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED } - public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); SpoutConfig _spoutConfig; SpoutOutputCollector _collector; PartitionCoordinator _coordinator; DynamicPartitionConnections _connections; - ZkState _state; + PartitionStateManagerFactory _partitionStateManagerFactory; long _lastUpdateMs = 0; - int _currPartitionIndex = 0; public KafkaSpout(SpoutConfig spoutConf) { @@ -60,32 +73,18 @@ public KafkaSpout(SpoutConfig spoutConf) { public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; String topologyInstanceId = context.getStormId(); - Map stateConf = new HashMap(conf); - List zkServers = _spoutConfig.zkServers; - if (zkServers == null) { - zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - } - Integer zkPort = _spoutConfig.zkPort; - if (zkPort == null) { - zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); - } - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); - _state = new ZkState(stateConf); _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); + _partitionStateManagerFactory = new PartitionStateManagerFactory(conf, _spoutConfig); // using TransactionalState like this is a hack int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (_spoutConfig.hosts instanceof StaticHosts) { - _coordinator = new StaticCoordinator(_connections, conf, - _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + _coordinator = new StaticCoordinator(_connections, _partitionStateManagerFactory, conf, _spoutConfig, + context.getThisTaskIndex(), totalTasks, topologyInstanceId); } else { - _coordinator = new ZkCoordinator(_connections, conf, - _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + _coordinator = new ZkCoordinator(_connections, _partitionStateManagerFactory, conf, _spoutConfig, + context.getThisTaskIndex(), totalTasks, topologyInstanceId); } context.registerMetric("kafkaOffset", new IMetric() { @@ -94,7 +93,7 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle @Override public Object getValueAndReset() { List pms = _coordinator.getMyManagedPartitions(); - Set latestPartitions = new HashSet(); + Set latestPartitions = new HashSet<>(); for (PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); } @@ -121,7 +120,7 @@ public Object getValueAndReset() { @Override public void close() { - _state.close(); + _partitionStateManagerFactory.close(); } @Override diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaStateStore.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaStateStore.java new file mode 100644 index 00000000000..48edc161532 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaStateStore.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.collect.Maps; +import kafka.api.ConsumerMetadataRequest; +import kafka.common.ErrorMapping; +import kafka.common.OffsetAndMetadata; +import kafka.common.OffsetMetadataAndError; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.network.BlockingChannel; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class KafkaStateStore implements StateStore { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStateStore.class); + + private static final int OFFSET_MANAGER_DISCOVERY_TIMEOUT = 5000; + private static final long OFFSET_MANAGER_DISCOVERY_RETRY_BACKOFF = 1000L; + private static final int OFFSET_MANAGER_DISCOVERY_MAX_RETRY = 3; + + private KafkaStateStoreConfig _config; + private int _correlationId = 0; + // https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java + private volatile BlockingChannel _offsetManager; + + public KafkaStateStore(Map stormConf, SpoutConfig spoutConfig) { + this(new KafkaStateStoreConfig(stormConf, spoutConfig)); + } + + public KafkaStateStore(KafkaStateStoreConfig config) { + this._config = config; + } + + @Override + public void writeState(Partition p, Map state) { + assert state.containsKey("offset"); + + LOG.debug("Writing stat data {} for partition {}:{}.", state, p.host, p.partition); + Long offsetOfPartition = (Long)state.get("offset"); + String stateData = JSONValue.toJSONString(state); + write(offsetOfPartition, stateData, p); + } + + @Override + public Map readState(Partition p) { + LOG.debug("Reading state data for partition {}:{}.", p.host, p.partition); + String raw = read(p); + if (raw == null) { + LOG.warn("No state found for partition {}:{} at this time.", p.host, p.partition); + return null; + } + + LOG.debug("Retrieved state {} for partition {}:{}.", raw, p.host, p.partition); + Map state = (Map) JSONValue.parse(raw); + return state; + } + + @Override + public void close() { + _offsetManager.disconnect(); + _offsetManager = null; + LOG.info("kafka state store closed."); + } + + private BlockingChannel getOffsetManager(Partition partition) { + if (_offsetManager == null) { + _offsetManager = locateOffsetManager(partition); + } + return _offsetManager; + } + + // supposedly we only need to locate offset manager once. Other cases, such as the offsetManager + // gets relocated, should be rare. So it is ok to sync. + // + // although we take a particular partition to locate the offset manager, the instance of the + // offset manager should apply to the entire consumer group + private synchronized BlockingChannel locateOffsetManager(Partition partition) { + + // if another invocation has already + if (_offsetManager != null) { + return _offsetManager; + } + + LOG.info("Try to locate the offset manager by asking broker {}:{}.", partition.host.host, partition.host.port); + BlockingChannel channel = new BlockingChannel(partition.host.host, partition.host.port, + BlockingChannel.UseDefaultBufferSize(), + BlockingChannel.UseDefaultBufferSize(), + OFFSET_MANAGER_DISCOVERY_TIMEOUT /* read timeout in millis */); + channel.connect(); + + ConsumerMetadataResponse metadataResponse = null; + long backoffMillis = OFFSET_MANAGER_DISCOVERY_RETRY_BACKOFF; + int maxRetry = OFFSET_MANAGER_DISCOVERY_MAX_RETRY; + int retryCount = 0; + // this usually only happens when the internal offsets topic does not exist before and we need to wait until + // the topic is automatically created and the meta data are populated across cluster. So we hard-code the retry here. + + // one scenario when this could happen is during unit test. + while (retryCount < maxRetry) { + channel.send(new ConsumerMetadataRequest(_config.getConsumerId(), ConsumerMetadataRequest.CurrentVersion(), + _correlationId++, _config.getClientId())); + metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); + assert (metadataResponse != null); + + // only retry if the error indicates the offset manager is temporary unavailable + if (metadataResponse.errorCode() == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { + LOG.warn("Offset manager is not available yet. Will retry in {} ms", backoffMillis); + retryCount++; + try { + Thread.sleep(backoffMillis); + } catch (InterruptedException e) { + // eat the exception + } + } else { + break; + } + } + + assert (metadataResponse != null); + if (metadataResponse.errorCode() == ErrorMapping.NoError()) { + kafka.cluster.Broker offsetManager = metadataResponse.coordinator(); + if (!offsetManager.host().equals(partition.host.host) + || !(offsetManager.port() == partition.host.port)) { + LOG.info("Reconnect to the offset manager on a different broker {}:{}.", offsetManager.host(), offsetManager.port()); + channel.disconnect(); + channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), + BlockingChannel.UseDefaultBufferSize(), + BlockingChannel.UseDefaultBufferSize(), + _config.getStateOpTimeout() /* read timeout in millis */); + channel.connect(); + } + } else { + throw new RuntimeException("Unable to locate offset manager. Error code is " + metadataResponse.errorCode()); + } + + LOG.info("Successfully located offset manager."); + return channel; + } + + private String attemptToRead(Partition partition) { + List partitions = new ArrayList(); + TopicAndPartition thisTopicPartition = new TopicAndPartition(_config.getTopic(), partition.partition); + partitions.add(thisTopicPartition); + OffsetFetchRequest fetchRequest = new OffsetFetchRequest( + _config.getConsumerId(), + partitions, + (short) 1, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper + _correlationId++, + _config.getClientId()); + + BlockingChannel offsetManager = getOffsetManager(partition); + offsetManager.send(fetchRequest.underlying()); + OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(offsetManager.receive().buffer()); + OffsetMetadataAndError result = fetchResponse.offsets().get(thisTopicPartition); + if (result.error() == ErrorMapping.NoError()) { + String retrievedMetadata = result.metadata(); + if (retrievedMetadata != null) { + return retrievedMetadata; + } else { + // let it return null, this maybe the first time it is called before the state is persisted + return null; + } + + } else { + throw new RuntimeException("OffsetMetadataAndError:" + result.error()); + } + } + + private String read(Partition partition) { + int attemptCount = 0; + while (true) { + try { + return attemptToRead(partition); + + } catch(RuntimeException re) { + _offsetManager = null; + if (++attemptCount > _config.getStateOpMaxRetry()) { + throw re; + } else { + LOG.warn("Attempt " + attemptCount + " out of " + _config.getStateOpMaxRetry() + + ". Failed to fetch state for partition " + partition.partition + + " of topic " + _config.getTopic() + ". Error code is " + re.getMessage()); + } + } + } + } + + private void attemptToWrite(long offsetOfPartition, String state, Partition partition) { + long now = System.currentTimeMillis(); + Map offsets = Maps.newLinkedHashMap(); + TopicAndPartition thisTopicPartition = new TopicAndPartition(_config.getTopic(), partition.partition); + offsets.put(thisTopicPartition, new OffsetAndMetadata( + offsetOfPartition, + state, + now)); + OffsetCommitRequest commitRequest = new OffsetCommitRequest( + _config.getConsumerId(), + offsets, + _correlationId, + _config.getClientId(), + (short) 1); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper + + BlockingChannel offsetManager = getOffsetManager(partition); + offsetManager.send(commitRequest.underlying()); + OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(offsetManager.receive().buffer()); + if (commitResponse.hasError()) { + // note: here we should have only 1 error for the partition in request + for (Object partitionErrorCode : commitResponse.errors().values()) { + if (partitionErrorCode.equals(ErrorMapping.OffsetMetadataTooLargeCode())) { + throw new RuntimeException("Data is too big. The state object is " + state); + } else { + throw new RuntimeException("OffsetCommitResponse:" + partitionErrorCode); + } + } + } + } + + private void write(Long offsetOfPartition, String state, Partition partition) { + int attemptCount = 0; + while (true) { + try { + attemptToWrite(offsetOfPartition, state, partition); + return; + + } catch(RuntimeException re) { + _offsetManager = null; + if (++attemptCount > _config.getStateOpMaxRetry()) { + throw re; + } else { + LOG.warn("Attempt " + attemptCount + " out of " + _config.getStateOpMaxRetry() + + ". Failed to save state for partition " + partition.partition + + " of topic " + _config.getTopic() + ". Error code is: " + re.getMessage()); + } + } + } + } + + public static class KafkaStateStoreConfig { + private final String topic; + private final int stateOpTimeout; + private final int stateOpMaxRetry; + private final String consumerId; + private final String clientId; + + public KafkaStateStoreConfig(Map stormConf, SpoutConfig spoutConfig) { + this.topic = spoutConfig.topic; + this.stateOpMaxRetry = spoutConfig.stateOpMaxRetry; + this.stateOpTimeout = spoutConfig.stateOpTimeout; + this.consumerId = spoutConfig.id; + this.clientId = spoutConfig.clientId; + } + + public String getTopic() { + return topic; + } + + public int getStateOpMaxRetry() { + return stateOpMaxRetry; + } + + public int getStateOpTimeout() { + return stateOpTimeout; + } + + public String getConsumerId() { + return consumerId; + } + + public String getClientId() { + return clientId; + } + } +} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index a2be825a326..fb56727df28 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -47,7 +47,7 @@ public class KafkaUtils { - public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); private static final int NO_OFFSET = -5; @@ -72,7 +72,7 @@ public static long getOffset(SimpleConsumer consumer, String topic, int partitio OffsetRequest request = new OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition); + long[] offsets = (long[])consumer.getOffsetsBefore(request).offsets(topic, partition); if (offsets.length > 0) { return offsets[0]; } else { diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index dbf70a0a9a2..f1c9b019923 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -19,7 +19,9 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; - +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; import org.apache.storm.Config; import org.apache.storm.kafka.KafkaSpout.EmitState; import org.apache.storm.kafka.trident.MaxMetric; @@ -31,42 +33,46 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; public class PartitionManager { - public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); + private static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); private final CombinedMetric _fetchAPILatencyMax; private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; - Long _emittedToOffset; + + private Long _emittedToOffset; // _pending key = Kafka offset, value = time at which the message was first submitted to the topology - private SortedMap _pending = new TreeMap(); + private final SortedMap _pending = new TreeMap(); private final FailedMsgRetryManager _failedMsgRetryManager; - // retryRecords key = Kafka offset, value = retry info for the given message - Long _committedTo; - LinkedList _waitingToEmit = new LinkedList(); - Partition _partition; - SpoutConfig _spoutConfig; - String _topologyInstanceId; - SimpleConsumer _consumer; - DynamicPartitionConnections _connections; - ZkState _state; - Map _stormConf; - long numberFailed, numberAcked; - public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { + private Long _committedTo; + private final LinkedList _waitingToEmit = new LinkedList(); + private final Partition _partition; + private final SpoutConfig _spoutConfig; + private final String _topologyInstanceId; + private final SimpleConsumer _consumer; + private final DynamicPartitionConnections _connections; + private final PartitionStateManager _partitionStateManager; + private final Map _stormConf; + private long numberFailed, numberAcked; + + public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, PartitionStateManager partitionStateManager, + Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; _consumer = connections.register(id.host, id.topic, id.partition); - _state = state; + _partitionStateManager = partitionStateManager; _stormConf = stormConf; numberAcked = numberFailed = 0; @@ -74,18 +80,17 @@ public PartitionManager(DynamicPartitionConnections connections, String topology _spoutConfig.retryDelayMultiplier, _spoutConfig.retryDelayMaxMs); + String jsonTopologyId = null; Long jsonOffset = null; - String path = committedPath(); try { - Map json = _state.readJSON(path); - LOG.info("Read partition information from: " + path + " --> " + json ); + Map json = _partitionStateManager.getState(); if (json != null) { jsonTopologyId = (String) ((Map) json.get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { - LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); + LOG.warn("Error reading and/or parsing partition state", e); } String topic = _partition.topic; @@ -94,8 +99,8 @@ public PartitionManager(DynamicPartitionConnections connections, String topology if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? _committedTo = currentOffset; LOG.info("No partition information found, using configuration to determine offset"); - } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { - _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); + } else if (!topologyInstanceId.equals(jsonTopologyId) && (spoutConfig.ignoreStoredOffsets || spoutConfig.ignoreZkOffsets)) { + _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); } else { _committedTo = jsonOffset; @@ -134,31 +139,31 @@ public EmitState next(SpoutOutputCollector collector) { fill(); } while (true) { - MessageAndOffset toEmit = _waitingToEmit.pollFirst(); + KafkaSpout.MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); if (toEmit == null) { return EmitState.NO_EMITTED; } Iterable> tups; if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { - tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset()); + tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset); } else { - tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic); + tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition.topic); } if ((tups != null) && tups.iterator().hasNext()) { if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { for (List tup : tups) { - collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset())); + collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset)); } } else { for (List tup : tups) { - collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset())); + collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } } break; } else { - ack(toEmit.offset()); + ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { @@ -224,7 +229,7 @@ private void fill() { if (!_pending.containsKey(cur_offset)) { _pending.put(cur_offset, System.currentTimeMillis()); } - _waitingToEmit.add(msg); + _waitingToEmit.add(new KafkaSpout.MessageAndRealOffset(msg.message(), msg.offset())); _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) { this._failedMsgRetryManager.retryStarted(cur_offset); @@ -266,28 +271,26 @@ public void fail(Long offset) { public void commit() { long lastCompletedOffset = lastCompletedOffset(); if (_committedTo != lastCompletedOffset) { - LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); + LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") for " + _partition + " for topology: " + _topologyInstanceId); Map data = (Map) ImmutableMap.builder() - .put("topology", ImmutableMap.of("id", _topologyInstanceId, + .put("topology", ImmutableMap.of( + "id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) - .put("broker", ImmutableMap.of("host", _partition.host.host, + .put("broker", ImmutableMap.of( + "host", _partition.host.host, "port", _partition.host.port)) .put("topic", _partition.topic).build(); - _state.writeJSON(committedPath(), data); + _partitionStateManager.writeState(data); _committedTo = lastCompletedOffset; - LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); + LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.debug("No new offset for {} for topology: {}", _partition, _topologyInstanceId); } } - private String committedPath() { - return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); - } - public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionStateManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionStateManager.java new file mode 100644 index 00000000000..4939c8fef27 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionStateManager.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +/** + * A partition state manager that simply encapsulates a partition in itself. Each instance of this manager keeps + * the state of its corresponding partition. + */ +public class PartitionStateManager implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStateStore.class); + + private Partition _partition; + private StateStore _stateStore; + + public PartitionStateManager(Partition partition, StateStore stateStore) { + this._partition = partition; + this._stateStore = stateStore; + } + + public Map getState() { + return _stateStore.readState(_partition); + } + + public void writeState(Map state) { + _stateStore.writeState(_partition, state); + } + + @Override + public void close() throws IOException { + if (_stateStore != null) { + _stateStore.close(); + } + LOG.info("State store closed."); + } +} \ No newline at end of file diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionStateManagerFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionStateManagerFactory.java new file mode 100644 index 00000000000..a924d597cdf --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionStateManagerFactory.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.base.Strings; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; + +import static org.apache.storm.kafka.SpoutConfig.STATE_STORE_KAFKA; +import static org.apache.storm.kafka.SpoutConfig.STATE_STORE_ZOOKEEPER; + +public class PartitionStateManagerFactory { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStateStore.class); + + private StateStore _stateStore; + + private StateStore createZkStateStore(Map conf, SpoutConfig spoutConfig) { + return new ZkStateStore(conf, spoutConfig); + } + + private StateStore createKafkaStateStore(Map conf, SpoutConfig spoutConfig) { + return new KafkaStateStore(conf, spoutConfig); + } + + private StateStore createCustomStateStore(Map conf, SpoutConfig spoutConfig, String customStateStoreClazzName) { + + Class customStateStoreClazz; + try { + customStateStoreClazz = Class.forName(customStateStoreClazzName); + } catch (ClassNotFoundException e) { + throw new RuntimeException(String.format("Invalid value defined for _spoutConfig.stateStore: %s. " + + "Valid values are %s, %s or name of the custom state store class. Default to %s", + spoutConfig.stateStore, STATE_STORE_ZOOKEEPER, STATE_STORE_KAFKA, STATE_STORE_ZOOKEEPER)); + } + + if (!StateStore.class.isAssignableFrom(customStateStoreClazz)) { + throw new RuntimeException(String.format("Invalid custom state store class: %s. " + + "Must implement interface " + StateStore.class.getCanonicalName(), + spoutConfig.stateStore)); + } + + Constructor constructor; + try { + constructor = customStateStoreClazz.getConstructor(Map.class, SpoutConfig.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException(String.format("Invalid custom state store class: %s. " + + "Must define a constructor with two parameters: Map conf, SpoutConfig spoutConfig", + spoutConfig.stateStore)); + } + + Object customStateStoreObj; + try { + customStateStoreObj= constructor.newInstance(conf, spoutConfig); + } catch (InstantiationException e) { + throw new RuntimeException(String.format("Failed to instantiate custom state store class: %s due to InstantiationException.", + spoutConfig.stateStore), e); + } catch (IllegalAccessException e) { + throw new RuntimeException(String.format("Failed to instantiate custom state store class: %s due to IllegalAccessException.", + spoutConfig.stateStore), e); + } catch (InvocationTargetException e) { + throw new RuntimeException(String.format("Failed to instantiate custom state store class: %s due to InvocationTargetException.", + spoutConfig.stateStore), e); + } + + assert (customStateStoreObj instanceof StateStore); + return (StateStore)customStateStoreObj; + } + + public PartitionStateManagerFactory(Map stormConf, SpoutConfig spoutConfig) { + + // default to original storm storage format + if (Strings.isNullOrEmpty(spoutConfig.stateStore) || STATE_STORE_ZOOKEEPER.equals(spoutConfig.stateStore)) { + _stateStore = createZkStateStore(stormConf, spoutConfig); + LOG.info("Created Zookeeper backed state store."); + + } else if (STATE_STORE_KAFKA.equals(spoutConfig.stateStore)) { + _stateStore = createKafkaStateStore(stormConf, spoutConfig); + LOG.info("Created Kafka backed state store."); + + } else { + _stateStore = createCustomStateStore(stormConf, spoutConfig, spoutConfig.stateStore); + LOG.info("Created custom state store implemented by {}.", spoutConfig.stateStore); + } + } + + public PartitionStateManager getInstance(Partition partition) { + return new PartitionStateManager(partition, _stateStore); + } + + public void close() { + if (_stateStore != null) { + IOUtils.closeQuietly(_stateStore); + } + } +} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java index 1ac41c8d211..5c55df24f20 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java @@ -22,14 +22,25 @@ public class SpoutConfig extends KafkaConfig implements Serializable { + + public static final String STATE_STORE_ZOOKEEPER = "zookeeper"; + public static final String STATE_STORE_KAFKA = "kafka"; + + /** a list of ZooKeeper servers */ public List zkServers = null; + + /** ZooKeeper port */ public Integer zkPort = null; + + /** ZooKeeper context root */ public String zkRoot = null; + + /** Spout id which is also used to track the Kafka offsets */ public String id = null; public String outputStreamId; - // setting for how often to save the current kafka offset to ZooKeeper + /** setting for how often to save the current kafka offset to ZooKeeper */ public long stateUpdateIntervalMs = 2000; // Exponential back-off retry settings. These are used when retrying messages after a bolt @@ -38,6 +49,20 @@ public class SpoutConfig extends KafkaConfig implements Serializable { public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000; + /** offset state information storage. validate options are zookeeper and kafka */ + public String stateStore = STATE_STORE_ZOOKEEPER; + + /** timeout in millis for state read/write operations */ + public int stateOpTimeout = 5000; + + /** max retries allowed for state read/write operations */ + public int stateOpMaxRetry = 3; + + public SpoutConfig(BrokerHosts hosts, String topic, String id) { + super(hosts, topic); + this.id = id; + } + public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StateStore.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StateStore.java new file mode 100644 index 00000000000..60103bf7d29 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StateStore.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.io.Closeable; +import java.util.Map; + +/** + * Abstraction of a partition state storage. + *

+ * The partition state usually is kept in Json format in the store and in Map format in runtime memory. An example + * is shown below: + *

+ * + *

+ * Json: + * + * { + * "broker": { + * "host": "kafka.sample.net", + * "port": 9092 + * }, + * "offset": 4285, + * "partition": 1, + * "topic": "testTopic", + * "topology": { + * "id": "fce905ff-25e0 -409e-bc3a-d855f 787d13b", + * "name": "Test Topology" + * } + * } + * + *

+ * + *

+ * Memory: + * + * Map data = (Map) ImmutableMap.builder() + * .put("topology", ImmutableMap.of( + * "id", "fce905ff-25e0 -409e-bc3a-d855f 787d13b", + * "name", "Test Topology")) + * .put("offset", 4285) + * .put("partition", 1) + * .put("broker", ImmutableMap.of( + * "host", "kafka.sample.net", + * "port", 9092)) + * .put("topic", "testTopic").build(); + * + * + *

+ * + *

+ * User can create their own custom state store by implementing this interface and register it with + * {@link SpoutConfig#stateStore}. The implementation class must also provide a public constructor + * that takes two arguments: + *

+ * + * + * public CustomStateStor(Map stormConf and SpoutConfig spoutConfig) + * + * + *

+ * See {@see KafkaStateStore} class as an example. + *

+ */ +public interface StateStore extends Closeable { + + Map readState(Partition p); + + void writeState(Partition p, Map state); +} \ No newline at end of file diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java index bdbc44d2b15..00602490b5f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -23,16 +23,18 @@ public class StaticCoordinator implements PartitionCoordinator { - Map _managers = new HashMap(); - List _allManagers = new ArrayList(); - public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - StaticHosts hosts = (StaticHosts) config.hosts; + private Map _managers = new HashMap(); + private List _allManagers = new ArrayList(); + + public StaticCoordinator(DynamicPartitionConnections connections, PartitionStateManagerFactory partitionStateManagerFactory, Map stormConf, SpoutConfig spoutConfig, int taskIndex, int totalTasks, String topologyInstanceId) { + StaticHosts hosts = (StaticHosts) spoutConfig.hosts; List partitions = new ArrayList(); partitions.add(hosts.getPartitionInformation()); List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex); + for (Partition myPartition : myPartitions) { - _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); + _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, partitionStateManagerFactory.getInstance(myPartition), stormConf, spoutConfig, myPartition)); } _allManagers = new ArrayList(_managers.values()); } @@ -48,5 +50,4 @@ public PartitionManager getManager(Partition partition) { @Override public void refresh() { return; } - } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index a53d5660a04..fa35626a971 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -26,7 +26,7 @@ import static org.apache.storm.kafka.KafkaUtils.taskId; public class ZkCoordinator implements PartitionCoordinator { - public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); SpoutConfig _spoutConfig; int _taskIndex; @@ -34,25 +34,25 @@ public class ZkCoordinator implements PartitionCoordinator { String _topologyInstanceId; Map _managers = new HashMap(); List _cachedList = new ArrayList(); + PartitionStateManagerFactory _partitionStateManagerFactory; Long _lastRefreshTime = null; int _refreshFreqMs; DynamicPartitionConnections _connections; DynamicBrokersReader _reader; - ZkState _state; Map _stormConf; - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); + public ZkCoordinator(DynamicPartitionConnections connections, PartitionStateManagerFactory partitionStateManagerFactory, Map stormConf, SpoutConfig spoutConfig, int taskIndex, int totalTasks, String topologyInstanceId) { + this(connections, partitionStateManagerFactory, stormConf, spoutConfig, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); } - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { - _spoutConfig = spoutConfig; + public ZkCoordinator(DynamicPartitionConnections connections, PartitionStateManagerFactory partitionStateManagerFactory, Map stormConf, SpoutConfig spoutConfig, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _connections = connections; + _partitionStateManagerFactory = partitionStateManagerFactory; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; - _state = state; + _spoutConfig = spoutConfig; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; @@ -95,7 +95,7 @@ public void refresh() { LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { - PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); + PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _partitionStateManagerFactory.getInstance(id), _stormConf, _spoutConfig, id); _managers.put(id, man); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkStateStore.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkStateStore.java new file mode 100644 index 00000000000..ba4c5857a8f --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkStateStore.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.CreateMode; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +public class ZkStateStore implements StateStore { + private static final Logger LOG = LoggerFactory.getLogger(ZkStateStore.class); + + private ZkStateStoreConfig _config; + private CuratorFramework _curator; + + private CuratorFramework newCurator(ZkStateStoreConfig config) throws Exception { + LOG.info("Creating new curator framework on {}.", config.getZkServerPorts()); + return CuratorFrameworkFactory.newClient(config.getZkServerPorts(), + config.getSessionTimeout(), config.getConnectionTimeout(), + new RetryNTimes(config.getRetryTimes(), config.getRetryInterval())); + } + + public CuratorFramework getCurator() { + assert _curator != null; + return _curator; + } + + public ZkStateStore(ZkStateStoreConfig config) { + this._config = config; + try { + _curator = newCurator(_config); + _curator.start(); + LOG.info("Started curator framework."); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public ZkStateStore(Map conf, SpoutConfig spoutConfig) { + this(new ZkStateStoreConfig(conf, spoutConfig)); + } + + @Override + public void writeState(Partition p, Map state) { + String zkPath = committedPath(p); + LOG.debug("Writing to {} with stat data {} for partition {}:{}.", zkPath, state, p.host, p.partition); + write(zkPath, JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"))); + } + + @Override + public Map readState(Partition p) { + String zkPath = committedPath(p); + LOG.debug("Reading from {} for state data for partition {}:{}.", zkPath, p.host, p.partition); + try { + byte[] b = read(zkPath); + if (b == null) { + LOG.warn("No state found for partition {}:{} at this time.", p.host, p.partition); + return null; + } + Map state = (Map) JSONValue.parse(new String(b, "UTF-8")); + LOG.debug("Retrieved state {} from {} for partition {}:{}.", state, zkPath, p.host, p.partition); + return state; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + _curator.close(); + _curator = null; + LOG.info("Closed curator framework."); + } + + private String committedPath(Partition partition) { + return _config.getZkRoot() + "/" + _config.getConsumerId() + "/" + partition.getId(); + } + + private void write(String path, byte[] bytes) { + try { + if (_curator.checkExists().forPath(path) == null) { + _curator.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, bytes); + } else { + _curator.setData().forPath(path, bytes); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private byte[] read(String path) { + try { + if (_curator.checkExists().forPath(path) != null) { + return _curator.getData().forPath(path); + } else { + return null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static class ZkStateStoreConfig { + private final String zkServerPorts; + private final String zkRoot; + private final String consumerId; + private final int connectionTimeout; + private final int sessionTimeout; + private final int retryTimes; + private final int retryInterval; + + public ZkStateStoreConfig(Map conf, SpoutConfig spoutConfig) { + List zkServers = spoutConfig.zkServers; + if (zkServers == null) { + zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + } + + Integer zkPort = spoutConfig.zkPort; + if (zkPort == null) { + zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); + } + + String serverPorts = ""; + for (String server : zkServers) { + serverPorts = serverPorts + server + ":" + zkPort + ","; + } + + this.zkServerPorts = serverPorts; + this.zkRoot = spoutConfig.zkRoot; + this.consumerId = spoutConfig.id; + this.connectionTimeout = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)); + this.sessionTimeout = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)); + this.retryTimes = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)); + this.retryInterval = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)); + } + + public String getZkServerPorts() { + return zkServerPorts; + } + + public String getZkRoot() { + return zkRoot; + } + + public String getConsumerId() { + return consumerId; + } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public int getSessionTimeout() { + return sessionTimeout; + } + + public int getRetryTimes() { + return retryTimes; + } + + public int getRetryInterval() { + return retryInterval; + } + } +} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java index 9732c8c57b4..1ecb4152d81 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java @@ -41,7 +41,7 @@ public class TridentKafkaEmitter { - public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class); + private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class); private DynamicPartitionConnections _connections; private String _topologyName; @@ -100,7 +100,7 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition if (lastTopoMeta != null) { lastInstanceId = (String) lastTopoMeta.get("id"); } - if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) { + if ((_config.ignoreStoredOffsets || _config.ignoreZkOffsets) && !_topologyInstanceId.equals(lastInstanceId)) { offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime); } else { offset = (Long) lastMeta.get("nextOffset"); @@ -157,7 +157,7 @@ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition pa private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) { LOG.info("re-emitting batch, attempt " + attempt); String instanceId = (String) meta.get("instanceId"); - if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) { + if (!(_config.ignoreStoredOffsets || _config.ignoreZkOffsets) || instanceId.equals(_topologyInstanceId)) { SimpleConsumer consumer = _connections.register(partition); long offset = (Long) meta.get("offset"); long nextOffset = (Long) meta.get("nextOffset"); diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java index d26c341da8a..abc10db3d7d 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java @@ -29,7 +29,7 @@ public class ZkBrokerReader implements IBrokerReader { - public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class); List cachedBrokers = new ArrayList(); DynamicBrokersReader reader; diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaStateStoreTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaStateStoreTest.java new file mode 100644 index 00000000000..6af2a1a3643 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaStateStoreTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.collect.ImmutableMap; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.curator.test.TestingServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class KafkaStateStoreTest { + + private KafkaTestBroker testBroker; + private KafkaStateStore stateStore; + + @Before + public void setUp() throws Exception { + String testTopic = "testTopic"; + + TestingServer server = new TestingServer(); + testBroker = new KafkaTestBroker(server, 0); + String connectionString = server.getConnectString(); + + Properties props = new Properties(); + props.put("metadata.broker.list", testBroker.getBrokerConnectionString()); + Producer p = new Producer<>(new ProducerConfig(props)); + KeyedMessage msg = new KeyedMessage<>(testTopic, "test message".getBytes()); + p.send(msg); + + ZkHosts hosts = new ZkHosts(connectionString); + SpoutConfig spoutConfig = new SpoutConfig(hosts, testTopic, "/", "testConsumerGroup"); + + Map stormConf = new HashMap(); + + stateStore = new KafkaStateStore(stormConf, spoutConfig); + } + + @After + public void shutdown() throws Exception { + testBroker.shutdown(); + } + + @Test + public void testStoreReadWrite() { + Partition testPartition = new Partition(new Broker("localhost", testBroker.getPort()), "testTopic", 0); + + Map broker = ImmutableMap.of("host", "kafka.sample.net", "port", 9100L); + Map topology = ImmutableMap.of("id", "fce905ff-25e0 -409e-bc3a-d855f 787d13b", "name", "Test Topology"); + Map testState = ImmutableMap.of("broker", broker, "offset", 4285L, "partition", 1L, "topic", "testTopic", "topology", topology); + + stateStore.writeState(testPartition, testState); + Map state = stateStore.readState(testPartition); + + assertEquals("kafka.sample.net", ((Map)state.get("broker")).get("host")); + assertEquals(9100L, ((Map)state.get("broker")).get("port")); + assertEquals(4285L, state.get("offset")); + assertEquals(1L, state.get("partition")); + assertEquals(4285L, state.get("offset")); + assertEquals("testTopic", state.get("topic")); + assertEquals("fce905ff-25e0 -409e-bc3a-d855f 787d13b", ((Map) state.get("topology")).get("id")); + assertEquals("Test Topology", ((Map)state.get("topology")).get("name")); + } +} diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java index e2fb60f5a03..75d03480c2c 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java @@ -45,15 +45,19 @@ public class KafkaTestBroker { private File logDir; public KafkaTestBroker() { + this(null, 0); + } + + public KafkaTestBroker(TestingServer s, int brokerId) { try { - server = new TestingServer(); + server = s == null? server = new TestingServer() : s; String zookeeperConnectionString = server.getConnectString(); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); zookeeper.start(); port = InstanceSpec.getRandomPort(); logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port); - KafkaConfig config = buildKafkaConfig(zookeeperConnectionString); + KafkaConfig config = buildKafkaConfig(zookeeperConnectionString, brokerId); kafka = new KafkaServerStartable(config); kafka.startup(); } catch (Exception ex) { @@ -61,12 +65,14 @@ public KafkaTestBroker() { } } - private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) { + private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString, int brokerId) { Properties p = new Properties(); p.setProperty("zookeeper.connect", zookeeperConnectionString); - p.setProperty("broker.id", "0"); + p.setProperty("broker.id", String.valueOf(brokerId)); p.setProperty("port", "" + port); p.setProperty("log.dirs", logDir.getAbsolutePath()); + // we need to make it default to 1 so that offset manager can create the internal topic during unit test + p.setProperty("offsets.topic.replication.factor", "1"); return new KafkaConfig(p); } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java index 9da6c0a5187..85756d999af 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java @@ -117,7 +117,7 @@ public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exc @Test public void getOffsetFromConfigAndDontForceFromStart() { - config.ignoreZkOffsets = false; + config.ignoreStoredOffsets = false; config.startOffsetTime = OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); @@ -127,7 +127,7 @@ public void getOffsetFromConfigAndDontForceFromStart() { @Test public void getOffsetFromConfigAndFroceFromStart() { - config.ignoreZkOffsets = true; + config.ignoreStoredOffsets = true; config.startOffsetTime = OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionStateManagerFactoryTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionStateManagerFactoryTest.java new file mode 100644 index 00000000000..4966ec3f672 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionStateManagerFactoryTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +public class PartitionStateManagerFactoryTest { + + PartitionStateManagerFactory factory; + + @Before + public void setUp() throws Exception { + } + + @Test + public void testCustomStateStore() throws Exception { + Map stormConfig = new HashMap(); + SpoutConfig spoutConfig = new SpoutConfig(null, null, null); + spoutConfig.stateStore = TestStateStore.class.getName(); + + factory = new PartitionStateManagerFactory(stormConfig, spoutConfig); + + PartitionStateManager partitionStateManager = factory.getInstance(null); + + assertEquals(TestStateStore.MAGIC_STATE, partitionStateManager.getState()); + } + + public static class TestStateStore implements StateStore { + + public static final Map MAGIC_STATE = ImmutableMap.of(new Object(), new Object()); + + public TestStateStore(Map stormConf, SpoutConfig spoutConfig) {} + + @Override + public Map readState(Partition p) { + return MAGIC_STATE; + } + + @Override + public void writeState(Partition p, Map state) { + + } + + @Override + public void close() throws IOException { + + } + } +} \ No newline at end of file diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZKStateStoreTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZKStateStoreTest.java new file mode 100644 index 00000000000..39b1b74c338 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZKStateStoreTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import com.google.common.collect.ImmutableMap; +import org.apache.curator.test.TestingServer; +import org.apache.storm.Config; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ZKStateStoreTest { + + private TestingServer server; + private ZkStateStore stateStore; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + server = new TestingServer(); + String connectionString = server.getConnectString(); + ZkHosts hosts = new ZkHosts(connectionString); + + SpoutConfig spoutConfig; + spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id"); + spoutConfig.zkServers = Arrays.asList("localhost"); + spoutConfig.zkPort = server.getPort(); + + Map stormConf = new HashMap<>(); + stormConf.put(Config.STORM_ZOOKEEPER_PORT, spoutConfig.zkPort); + stormConf.put(Config.STORM_ZOOKEEPER_SERVERS, spoutConfig.zkServers); + stormConf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); + stormConf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); + stormConf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); + stormConf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); + + stateStore = new ZkStateStore(stormConf, spoutConfig); + } + + @After + public void shutdown() throws Exception { + stateStore.close(); + server.close(); + } + + @Test + public void testStoreReadWrite() { + + Partition testPartition = new Partition(new Broker("localhost", 9100), "testTopic", 1); + + Map broker = ImmutableMap.of("host", "kafka.sample.net", "port", 9100L); + Map topology = ImmutableMap.of("id", "fce905ff-25e0 -409e-bc3a-d855f 787d13b", "name", "Test Topology"); + Map testState = ImmutableMap.of("broker", broker, "offset", 4285L, "partition", 1L, "topic", "testTopic", "topology", topology); + + stateStore.writeState(testPartition, testState); + Map state = stateStore.readState(testPartition); + + assertEquals("kafka.sample.net", ((Map)state.get("broker")).get("host")); + assertEquals(9100L, ((Map)state.get("broker")).get("port")); + assertEquals(4285L, state.get("offset")); + assertEquals(1L, state.get("partition")); + assertEquals(4285L, state.get("offset")); + assertEquals("testTopic", state.get("topic")); + assertEquals("fce905ff-25e0 -409e-bc3a-d855f 787d13b", ((Map) state.get("topology")).get("id")); + assertEquals("Test Topology", ((Map)state.get("topology")).get("name")); + } +} diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 65bf0b45ee1..eff6c2e17e6 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -47,34 +47,35 @@ public class ZkCoordinatorTest { private TestingServer server; private Map stormConf = new HashMap(); private SpoutConfig spoutConfig; - private ZkState state; private SimpleConsumer simpleConsumer; + private PartitionStateManagerFactory partitionStateManagerFactory; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); + server = new TestingServer(); String connectionString = server.getConnectString(); ZkHosts hosts = new ZkHosts(connectionString); hosts.refreshFreqSecs = 1; + spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id"); - Map conf = buildZookeeperConfig(server); - state = new ZkState(conf); + spoutConfig.zkServers = Arrays.asList("localhost"); + spoutConfig.zkPort = server.getPort(); + + stormConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort()); + stormConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); + stormConf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); + stormConf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); + stormConf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); + stormConf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); + + partitionStateManagerFactory = new PartitionStateManagerFactory(stormConf, spoutConfig); + simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer); } - private Map buildZookeeperConfig(TestingServer server) { - Map conf = new HashMap(); - conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort()); - conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); - conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); - conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); - conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); - conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); - return conf; - } - @After public void shutdown() throws Exception { simpleConsumer.close(); @@ -95,7 +96,6 @@ public void testOnePartitionPerTask() throws Exception { } } - @Test public void testPartitionsChange() throws Exception { final int totalTasks = 64; @@ -138,7 +138,7 @@ private void waitForRefresh() throws InterruptedException { private List buildCoordinators(int totalTasks) { List coordinatorList = new ArrayList(); for (int i = 0; i < totalTasks; i++) { - ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader); + ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, partitionStateManagerFactory, stormConf, spoutConfig, i, totalTasks, "test-id", reader); coordinatorList.add(coordinator); } return coordinatorList;