From 34ffb9562374ddd0c1c0c12834a7ae03d7e7cbab Mon Sep 17 00:00:00 2001 From: Rodolfo Ribeiro Date: Thu, 29 Jun 2017 12:06:20 -0300 Subject: [PATCH 1/5] [STORM-2607] Offset consumer + 1 --- .../java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index b802a52b830..00253d62326 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -52,7 +52,7 @@ public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { */ public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) { this.topicPart = topicPart; - this.offset = offset; + this.offset = offset + 1; this.emitted = emitted; } From c3721f574258059b38a010d61be7ec49215967ef Mon Sep 17 00:00:00 2001 From: Rodolfo Ribeiro Date: Thu, 29 Jun 2017 17:20:05 -0300 Subject: [PATCH 2/5] [STORM-2607] Fixed committed offset should be the next message. --- .../org/apache/storm/kafka/spout/KafkaSpoutMessageId.java | 2 +- .../org/apache/storm/kafka/spout/internal/OffsetManager.java | 4 +++- .../apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index 00253d62326..b802a52b830 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -52,7 +52,7 @@ public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { */ public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) { this.topicPart = topicPart; - this.offset = offset + 1; + this.offset = offset; this.emitted = emitted; } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index be0f55184f0..2beb0224ac7 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -118,6 +118,8 @@ public OffsetAndMetadata findNextCommitOffset() { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (nextCommitMsg != null) { + //The committed offset should be the next message + nextCommitOffset = nextCommitOffset + 1; nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); @@ -162,7 +164,7 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); - return numCommittedOffsets; + return numCommittedOffsets - 1;// The committed offset should be the next message } public long getCommittedOffset() { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 7f0973b6d2f..266631d09dd 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -114,7 +114,7 @@ private void verifyAllMessagesCommitted(long messageCount) { Map commits = commitCapture.getValue(); assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); - assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); } @Test From 5c72981616f20efadcba2d1181d8c8858306e129 Mon Sep 17 00:00:00 2001 From: Rodolfo Ribeiro Date: Thu, 29 Jun 2017 19:30:39 -0300 Subject: [PATCH 3/5] [STORM-2607] Fixed PR --- .../kafka/spout/internal/OffsetManager.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 2beb0224ac7..c7492bf3c41 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -76,15 +76,17 @@ public void addToEmitMsgs(long offset) { public OffsetAndMetadata findNextCommitOffset() { long currOffset; long nextCommitOffset = committedOffset; + long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == nextCommitOffset + 1) { // found the next offset to commit + if (currOffset == lastOffMessageOffset + 1) { // found the next offset to commit nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; - } else if (currOffset > nextCommitOffset + 1) { - if (emittedOffsets.contains(nextCommitOffset + 1)) { + lastOffMessageOffset = currOffset; + nextCommitOffset = lastOffMessageOffset + 1; + } else if (currOffset > lastOffMessageOffset + 1) { + if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + " It will be processed in a subsequent batch.", tp, currOffset); break; @@ -99,10 +101,11 @@ public OffsetAndMetadata findNextCommitOffset() { LOG.debug("Processed non contiguous offset." + " (committedOffset+1) is no longer part of the topic." + " Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; + lastOffMessageOffset = currOffset; + nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + " Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); @@ -118,8 +121,6 @@ public OffsetAndMetadata findNextCommitOffset() { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (nextCommitMsg != null) { - //The committed offset should be the next message - nextCommitOffset = nextCommitOffset + 1; nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); @@ -141,7 +142,7 @@ public OffsetAndMetadata findNextCommitOffset() { */ public long commit(OffsetAndMetadata committedOffset) { final long preCommitCommittedOffsets = this.committedOffset; - final long numCommittedOffsets = committedOffset.offset() - this.committedOffset; + final long numCommittedOffsets = committedOffset.offset() - this.committedOffset - 1; this.committedOffset = committedOffset.offset(); for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) { if (iterator.next().offset() <= committedOffset.offset()) { @@ -164,7 +165,7 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); - return numCommittedOffsets - 1;// The committed offset should be the next message + return numCommittedOffsets; } public long getCommittedOffset() { From f192ad2738b36c46dc7ca57c7e5518cd5b051abc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sat, 1 Jul 2017 17:29:56 +0200 Subject: [PATCH 4/5] STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API --- .../kafka/spout/internal/OffsetManager.java | 82 ++++++------ .../kafka/internal/OffsetManagerTest.java | 125 ++++++++++++++++++ 2 files changed, 169 insertions(+), 38 deletions(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index c7492bf3c41..c104348c69b 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -38,8 +38,8 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; - // Last offset committed to Kafka. Initially it is set to fetchOffset - 1 - private long committedOffset; + // Earliest uncommitted offset, i.e. the last committed offset + 1. Initially it is set to fetchOffset + private long earliestUncommittedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); // Acked messages sorted by ascending order of offset @@ -53,7 +53,7 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; - this.committedOffset = initialFetchOffset - 1; + this.earliestUncommittedOffset = initialFetchOffset; LOG.debug("Instantiated {}", this); } @@ -66,28 +66,28 @@ public void addToEmitMsgs(long offset) { } /** - * An offset is only committed when all records with lower offset have been + * An offset can only be committed when all emitted records with lower offset have been * acked. This guarantees that all offsets smaller than the committedOffset - * have been delivered. + * have been delivered, or that those offsets no longer exist in Kafka. + *

+ * The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API. * * @return the next OffsetAndMetadata to commit, or null if no offset is * ready to commit. */ public OffsetAndMetadata findNextCommitOffset() { long currOffset; - long nextCommitOffset = committedOffset; - long lastOffMessageOffset = committedOffset; + long nextEarliestUncommittedOffset = earliestUncommittedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == lastOffMessageOffset + 1) { // found the next offset to commit + if (currOffset == nextEarliestUncommittedOffset) { // found the next offset to commit nextCommitMsg = currAckedMsg; - lastOffMessageOffset = currOffset; - nextCommitOffset = lastOffMessageOffset + 1; - } else if (currOffset > lastOffMessageOffset + 1) { - if (emittedOffsets.contains(lastOffMessageOffset + 1)) { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + nextEarliestUncommittedOffset = currOffset + 1; + } else if (currOffset > nextEarliestUncommittedOffset) { + if (emittedOffsets.contains(nextEarliestUncommittedOffset)) { + LOG.debug("topic-partition [{}] has non-contiguous offset [{}]." + " It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -98,32 +98,34 @@ public OffsetAndMetadata findNextCommitOffset() { the next logical point in the topic. Next logical offset should be the first element after committedOffset in the ascending ordered emitted set. */ - LOG.debug("Processed non contiguous offset." - + " (committedOffset+1) is no longer part of the topic." - + " Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); + LOG.debug("Processed non-contiguous offset." + + " The earliest uncommitted offset is no longer part of the topic." + + " Missing uncommitted offset: [{}], Processed: [{}]", nextEarliestUncommittedOffset, currOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextEarliestUncommittedOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { + LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", + currOffset, nextEarliestUncommittedOffset); nextCommitMsg = currAckedMsg; - lastOffMessageOffset = currOffset; - nextCommitOffset = lastOffMessageOffset + 1; + nextEarliestUncommittedOffset = currOffset + 1; } else { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]." - + " Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); + LOG.debug("topic-partition [{}] has non-contiguous offset [{}]." + + " Next Offset to commit should be [{}]", tp, currOffset, nextEarliestUncommittedOffset - 1); break; } } } else { //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); + LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current earliest uncommitted offset [{}]", + tp, currOffset, earliestUncommittedOffset); } } OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (nextCommitMsg != null) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextEarliestUncommittedOffset, + nextCommitMsg.getMetadata(Thread.currentThread())); LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", - tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); + tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset()); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@ -132,20 +134,20 @@ public OffsetAndMetadata findNextCommitOffset() { } /** - * Marks an offset has committed. This method has side effects - it sets the + * Marks an offset as committed. This method has side effects - it sets the * internal state in such a way that future calls to - * {@link #findNextCommitOffset()} will return offsets greater than the + * {@link #findNextCommitOffset()} will return offsets greater than or equal to the * offset specified, if any. * - * @param committedOffset offset to be marked as committed + * @param earliestUncommittedOffset Earliest uncommitted offset. All lower offsets are expected to have been committed. * @return Number of offsets committed in this commit */ - public long commit(OffsetAndMetadata committedOffset) { - final long preCommitCommittedOffsets = this.committedOffset; - final long numCommittedOffsets = committedOffset.offset() - this.committedOffset - 1; - this.committedOffset = committedOffset.offset(); + public long commit(OffsetAndMetadata earliestUncommittedOffset) { + final long preCommitEarliestUncommittedOffset = this.earliestUncommittedOffset; + final long numCommittedOffsets = earliestUncommittedOffset.offset() - this.earliestUncommittedOffset; + this.earliestUncommittedOffset = earliestUncommittedOffset.offset(); for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) { - if (iterator.next().offset() <= committedOffset.offset()) { + if (iterator.next().offset() < earliestUncommittedOffset.offset()) { iterator.remove(); } else { break; @@ -153,7 +155,7 @@ public long commit(OffsetAndMetadata committedOffset) { } for (Iterator iterator = emittedOffsets.iterator(); iterator.hasNext();) { - if (iterator.next() <= committedOffset.offset()) { + if (iterator.next() < earliestUncommittedOffset.offset()) { iterator.remove(); } else { break; @@ -163,13 +165,13 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.trace("{}", this); LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", - preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); + preCommitEarliestUncommittedOffset, this.earliestUncommittedOffset - 1, numCommittedOffsets, tp); return numCommittedOffsets; } - public long getCommittedOffset() { - return committedOffset; + public long getEarliestUncommittedOffset() { + return earliestUncommittedOffset; } public boolean isEmpty() { @@ -183,13 +185,17 @@ public boolean contains(ConsumerRecord record) { public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } + + public boolean containsEmitted(long offset) { + return emittedOffsets.contains(offset); + } @Override public String toString() { return "OffsetManager{" + "topic-partition=" + tp + ", fetchOffset=" + initialFetchOffset - + ", committedOffset=" + committedOffset + + ", earliestUncommittedOffset=" + earliestUncommittedOffset + ", emittedOffsets=" + emittedOffsets + ", ackedMsgs=" + ackedMsgs + '}'; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java new file mode 100644 index 00000000000..553bbd5420a --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.internal; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.junit.Test; + +public class OffsetManagerTest { + + private final long initialFetchOffset = 0; + private final TopicPartition testTp = new TopicPartition("testTopic", 0); + private final OffsetManager manager = new OffsetManager(testTp, initialFetchOffset); + + @Test + public void testFindNextCommittedOffsetWithNoAcks() { + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue())); + } + + @Test + public void testFindNextCommitOffsetWithOneAck() { + /* + * The KafkaConsumer commitSync API docs: "The committed offset should be the next message your application will consume, i.e. + * lastProcessedMessageOffset + 1. " + */ + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() { + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2)); + } + + @Test + public void testFindNextCommitOffsetWithAckedOffsetGap() { + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { + /** + * If topic compaction is enabled in Kafka some offsets may be deleted. + * We distinguish this case from regular gaps in the acked offset sequence caused by out of order acking + * by checking that offsets in the gap have been emitted at some point previously. + * If they haven't then they can't exist in Kafka, since the spout emits tuples in order. + */ + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", + nextCommitOffset.offset(), is(initialFetchOffset + 3)); + } + + @Test + public void testFindNextCommitOffsetWithUnackedOffsetGap() { + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() { + OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10); + emitAndAckMessage(getMessageId(0)); + OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(); + assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue())); + } + + @Test + public void testCommit() { + emitAndAckMessage(getMessageId(initialFetchOffset)); + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + + long committedMessages = manager.commit(new OffsetAndMetadata(initialFetchOffset + 2)); + + assertThat("Should have committed all messages to the left of the earliest uncommitted offset", committedMessages, is(2L)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset), is(false)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset + 1), is(false)); + assertThat("The uncommitted message should still be in the acked list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true)); + assertThat("The uncommitted message should still be in the emitted list", manager.containsEmitted(initialFetchOffset + 2), is(true)); + } + + private KafkaSpoutMessageId getMessageId(long offset) { + return new KafkaSpoutMessageId(new ConsumerRecord<>(testTp.topic(), testTp.partition(), offset, null, null)); + } + + private void emitAndAckMessage(KafkaSpoutMessageId msgId) { + manager.addToEmitMsgs(msgId.offset()); + manager.addToAckMsgs(msgId); + } +} From da7f49b94502914af769d7f40929bd54d3fae865 Mon Sep 17 00:00:00 2001 From: Rodolfo Ribeiro Date: Tue, 4 Jul 2017 19:13:05 -0300 Subject: [PATCH 5/5] [STORM-2607] Fixed PR --- .../org/apache/storm/kafka/spout/internal/OffsetManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index c104348c69b..fadf0f14344 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -125,7 +125,7 @@ public OffsetAndMetadata findNextCommitOffset() { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextEarliestUncommittedOffset, nextCommitMsg.getMetadata(Thread.currentThread())); LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", - tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset()); + tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); }