diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index f7a387cb6e0..2d9a8445ab9 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -53,6 +53,20 @@
org.apache.kafkakafka-clients${storm.kafka.client.version}
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
@@ -66,6 +80,18 @@
4.11test
+
+ info.batey.kafka
+ kafka-unit
+ 0.6
+ test
+
+
+ org.slf4j
+ log4j-over-slf4j
+ ${log4j-over-slf4j.version}
+ test
+
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4389acb50e5..4f0780afa01 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -77,7 +77,7 @@ public class KafkaSpout extends BaseRichSpout {
private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples
private transient KafkaSpoutTuplesBuilder tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord
- private transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+ transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
private transient Set emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
private transient Iterator> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
@@ -266,19 +266,22 @@ private void doSeekRetriableTopicPartitions() {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
- kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset
+ kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
}
}
}
// ======== emit =========
private void emit() {
- emitTupleIfNotEmitted(waitingToEmit.next());
- waitingToEmit.remove();
+ while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) {
+ waitingToEmit.remove();
+ }
}
- // emits one tuple per record
- private void emitTupleIfNotEmitted(ConsumerRecord record) {
+
+ //Emits one tuple per record
+ //@return true if tuple was emitted
+ private boolean emitTupleIfNotEmitted(ConsumerRecord record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
@@ -295,7 +298,9 @@ private void emitTupleIfNotEmitted(ConsumerRecord record) {
retryService.remove(msgId); // re-emitted hence remove from failed
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ return true;
}
+ return false;
}
private void commitOffsetsForAckedTuples() {
@@ -451,7 +456,7 @@ public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
/**
* This class is not thread safe
*/
- private class OffsetEntry {
+ class OffsetEntry {
private final TopicPartition tp;
private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
* Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
@@ -479,7 +484,7 @@ public OffsetAndMetadata findNextCommitOffset() {
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
- if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit
+ if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
@@ -487,8 +492,9 @@ public OffsetAndMetadata findNextCommitOffset() {
LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
- LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
- break;
+ //Received a redundant ack. Ignore and continue processing.
+ LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
+ tp, currOffset, committedOffset);
}
}
@@ -532,6 +538,10 @@ public void commit(OffsetAndMetadata committedOffset) {
LOG.trace("{}", this);
}
+ long getCommittedOffset() {
+ return committedOffset;
+ }
+
public boolean isEmpty() {
return ackedMsgs.isEmpty();
}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
new file mode 100644
index 00000000000..8fa7b80c05d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import static org.mockito.Mockito.*;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+
+public class SingleTopicKafkaSpoutTest {
+
+ private class SpoutContext {
+ public KafkaSpout spout;
+ public SpoutOutputCollector collector;
+
+ public SpoutContext(KafkaSpout spout,
+ SpoutOutputCollector collector) {
+ this.spout = spout;
+ this.collector = collector;
+ }
+ }
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ void populateTopicData(String topicName, int msgCount) {
+ kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+ IntStream.range(0, msgCount).forEach(value -> {
+ KeyedMessage keyedMessage = new KeyedMessage<>(
+ topicName, Integer.toString(value),
+ Integer.toString(value));
+
+ kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
+ });
+ }
+
+ SpoutContext initializeSpout(int msgCount) {
+ populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+ int kafkaPort = kafkaUnitRule.getKafkaPort();
+
+ TopologyContext topology = mock(TopologyContext.class);
+ SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ Map conf = mock(Map.class);
+
+ KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
+ spout.open(conf, topology, collector);
+ spout.activate();
+ return new SpoutContext(spout, collector);
+ }
+ /*
+ * Asserts that the next possible offset to commit or the committed offset is the provided offset.
+ * An offset that is ready to be committed is not guarenteed to be already committed.
+ */
+ private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) {
+
+ boolean currentOffsetMatch = entry.getCommittedOffset() == offset;
+ OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
+ boolean nextOffsetMatch = nextOffset != null && nextOffset.offset() == offset;
+ assertTrue("Next offset: " +
+ entry.findNextCommitOffset() +
+ " OR current offset: " +
+ entry.getCommittedOffset() +
+ " must equal desired offset: " +
+ offset,
+ currentOffsetMatch | nextOffsetMatch);
+ }
+
+ @Test
+ public void shouldContinueWithSlowDoubleAcks() throws Exception {
+ int messageCount = 20;
+ SpoutContext context = initializeSpout(messageCount);
+
+ //play 1st tuple
+ ArgumentCaptor