diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 8f2867b0578..9f423532590 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -136,7 +136,7 @@
org.apache.kafka
kafka-clients
- 0.8.2.1
+ 0.9.0.0
provided
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 49c7526b999..5a0014bc13a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -21,12 +21,14 @@
import backtype.storm.spout.RawMultiScheme;
import java.io.Serializable;
+import java.util.List;
public class KafkaConfig implements Serializable {
private static final long serialVersionUID = 5276718734571623855L;
public final BrokerHosts hosts;
public final String topic;
+ public final List topics;
public final String clientId;
public int fetchSizeBytes = 1024 * 1024;
@@ -44,9 +46,21 @@ public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
}
+ public KafkaConfig(BrokerHosts hosts, List topics) {
+ this(hosts, topics, kafka.api.OffsetRequest.DefaultClientId());
+ }
+
public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
this.hosts = hosts;
this.topic = topic;
+ this.topics = null;
+ this.clientId = clientId;
+ }
+
+ public KafkaConfig(BrokerHosts hosts, List topics, String clientId) {
+ this.hosts = hosts;
+ this.topic = null;
+ this.topics = topics;
this.clientId = clientId;
}
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java
new file mode 100644
index 00000000000..fc5cbecbe84
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.Config;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class KafkaJavaApiSpout extends BaseRichSpout {
+
+
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaJavaApiSpout.class);
+
+ private final List messagesList = new ArrayList();
+ private KafkaConsumer consumer;
+ private Iterator> it;
+ private Map toBeCommitted;
+ private AtomicBoolean rebalanceFlag;
+
+
+ private int batchUpperLimit;
+ private int maxBatchDurationMillis;
+
+ private List topicList;
+
+ /**
+ * Lock critical section of code to be executed by one thread
+ * at the same time in order not to get a corrupted state when
+ * stopping kafka source.
+ */
+ private Lock lock;
+
+ SpoutConfig _spoutConfig;
+ SpoutOutputCollector _collector;
+ ConcurrentMap messages;
+ long pollTimeout;
+ long maxFailCount;
+
+ public KafkaJavaApiSpout(SpoutConfig spoutConfig) {
+ lock = new ReentrantLock();
+ messages = new ConcurrentHashMap<>();
+ this._spoutConfig = spoutConfig;
+ }
+
+ @Override
+ public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ _collector = collector;
+
+ LOG.debug("Opening KafkaJavaApiSpout...");
+
+ if(conf.containsKey(Config.KAFKA_POLL_TIMEOUT)) {
+ pollTimeout = (Long) conf.get(Config.KAFKA_POLL_TIMEOUT);
+ }else{
+ pollTimeout = 1000;
+ }
+ if(conf.containsKey(Config.KAFKA_MAX_FAIL_ATTEMPTS_COUNT)){
+ maxFailCount = (Long) conf.get(Config.KAFKA_MAX_FAIL_ATTEMPTS_COUNT);
+ }else{
+ maxFailCount = 5;
+ }
+
+ toBeCommitted = new HashMap();
+ rebalanceFlag = new AtomicBoolean(false);
+
+ // Subscribe to multiple topics. Check is multi-topic
+ topicList = _spoutConfig.topic == null ? _spoutConfig.topics : Collections.singletonList(_spoutConfig.topic);
+
+ if(topicList == null || topicList.isEmpty()) {
+ throw new KafkaException("At least one Kafka topic must be specified.");
+ }
+
+ batchUpperLimit = conf.containsKey(KafkaSpoutConstants.BATCH_SIZE) ? (int)conf.get(KafkaSpoutConstants.BATCH_SIZE)
+ : KafkaSpoutConstants.DEFAULT_BATCH_SIZE;
+ maxBatchDurationMillis = conf.containsKey(KafkaSpoutConstants.BATCH_DURATION_MS) ? (int)conf.get(KafkaSpoutConstants.BATCH_DURATION_MS)
+ : KafkaSpoutConstants.DEFAULT_BATCH_DURATION;
+
+ try {
+ //initialize a consumer.
+ consumer = new KafkaConsumer(conf);
+ } catch (Exception e) {
+ throw new KafkaException("Unable to create consumer. " +
+ "Check whether the Bootstrap server is up and that the " +
+ "Storm can connect to it.", e);
+ }
+
+ // We can use topic subscription or partition assignment strategy.
+ consumer.subscribe(topicList, new RebalanceListener(rebalanceFlag));
+
+ it = consumer.poll(pollTimeout).iterator();
+
+ LOG.info("Kafka spout started.");
+ }
+
+ @Override
+ public void nextTuple() {
+ LOG.debug("Polling next tuple...");
+ final String batchUUID = UUID.randomUUID().toString();
+ byte[] kafkaMessage;
+ String kafkaKey;
+
+ try {
+ // prepare time variables for new batch
+ final long batchStartTime = System.currentTimeMillis();
+ final long batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+
+ while (messagesList.size() < batchUpperLimit &&
+ System.currentTimeMillis() < batchEndTime) {
+
+ if (it == null || !it.hasNext()) {
+ // Obtaining new records
+ // Poll time is remainder time for current batch.
+ ConsumerRecords records = consumer.poll(
+ Math.max(0, batchEndTime - System.currentTimeMillis()));
+ it = records.iterator();
+
+ // this flag is set to true in a callback when some partitions are revoked.
+ // If there are any records we commit them.
+ if (rebalanceFlag.get()) {
+ rebalanceFlag.set(false);
+ break;
+ }
+ // check records after poll
+ if (!it.hasNext()) {
+ LOG.debug("Returning with backoff. No more data to read");
+ // batch time exceeded
+ break;
+ }
+ }
+
+ // get next message
+ ConsumerRecord message = it.next();
+ kafkaMessage = message.value();
+ kafkaKey = message.key();
+
+ LOG.debug("Message: {}", new String(kafkaMessage));
+ LOG.debug("Topic: {} Partition: {}", message.topic(), message.partition());
+
+ Values value = new Values(kafkaKey, kafkaMessage, message.topic(), maxFailCount);
+ messagesList.add(value);
+ messages.putIfAbsent(message.offset(), value);
+
+ LOG.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
+ LOG.debug("Messages #: {}", messagesList.size());
+
+
+ long offset = message.offset() + 1;
+ toBeCommitted.put(new TopicPartition(message.topic(), message.partition()),
+ new OffsetAndMetadata(offset, batchUUID));
+ }
+
+ if (messagesList.size() > 0) {
+
+ for (Values v : messagesList){
+ _collector.emit(v);
+ }
+
+ LOG.debug("Emitted {}", messagesList.size());
+
+ messagesList.clear();
+ // commit must not be interrupted when stops.
+ try {
+ lock.lock();
+ consumer.commitSync(toBeCommitted);
+ toBeCommitted.clear();
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("KafkaJavaApiSpout EXCEPTION, {}", e);
+ }
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.debug("Message with offset {} failed", msgId);
+ Values message = messages.get(msgId);
+ Long currentAttempt = (Long) message.get(3);
+ if(currentAttempt < 1){
+ LOG.debug("Message with offset {} reached maximum fail attempts. Skipping...", msgId);
+ }else{
+ message.set(3, currentAttempt-1);
+ _collector.emit(message, msgId);
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.debug("Message with offset {} proceeded successfully", msgId);
+ messages.remove(msgId);
+ }
+
+ @Override
+ public void close() {
+ if (consumer != null) {
+ try {
+ lock.lock();
+ consumer.wakeup();
+ consumer.close();
+ } finally {
+ lock.unlock();
+ }
+ }
+ LOG.info("Kafka Spout stopped.");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("key", "message", "topic", "attempt"));
+ }
+
+ public class KafkaSpoutConstants {
+
+ public static final String KAFKA_PREFIX = "kafka.";
+ public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
+ public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+ public static final String BOOTSTRAP_SERVERS = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ public static final String TOPICS = KAFKA_PREFIX + "topics";
+ public static final String DEFAULT_AUTO_COMMIT = "false";
+ public static final String BATCH_SIZE = "batchSize";
+ public static final String BATCH_DURATION_MS = "batchDurationMillis";
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+ public static final int DEFAULT_BATCH_DURATION = 1000;
+
+ }
+}
+
+class RebalanceListener implements ConsumerRebalanceListener {
+ private static final Logger log = LoggerFactory.getLogger(RebalanceListener.class);
+ private AtomicBoolean rebalanceFlag;
+
+ public RebalanceListener(AtomicBoolean rebalanceFlag) {
+ this.rebalanceFlag = rebalanceFlag;
+ }
+
+ // Set a flag that a rebalance has occurred. Then commit already read to kafka.
+ public void onPartitionsRevoked(Collection partitions) {
+ for (TopicPartition partition : partitions) {
+ log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
+ }
+ rebalanceFlag.set(true);
+ }
+
+ public void onPartitionsAssigned(Collection partitions) {
+ for (TopicPartition partition : partitions) {
+ log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
+ }
+ }
+}
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 9db99773f82..9071ca0f8fd 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -2066,6 +2066,21 @@ public class Config extends HashMap {
@isInteger
public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs";
+ /**
+ * Kafka spout config
+ * The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
+ * immediately with any records available now. Must not be negative.
+ */
+ @isInteger
+ public static final String KAFKA_POLL_TIMEOUT = "kafka.poll.timeout";
+
+ /**
+ * Kafka spout config
+ * Maximum count of re-emit attempts per tuple. It decrements by each fail() method called with this tuple.
+ */
+ @isInteger
+ public static final String KAFKA_MAX_FAIL_ATTEMPTS_COUNT = "kafka.max.fails";
+
public static void setClasspath(Map conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
}