diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index be0f55184f0..fadf0f14344 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -38,8 +38,8 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; - // Last offset committed to Kafka. Initially it is set to fetchOffset - 1 - private long committedOffset; + // Earliest uncommitted offset, i.e. the last committed offset + 1. Initially it is set to fetchOffset + private long earliestUncommittedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); // Acked messages sorted by ascending order of offset @@ -53,7 +53,7 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; - this.committedOffset = initialFetchOffset - 1; + this.earliestUncommittedOffset = initialFetchOffset; LOG.debug("Instantiated {}", this); } @@ -66,26 +66,28 @@ public void addToEmitMsgs(long offset) { } /** - * An offset is only committed when all records with lower offset have been + * An offset can only be committed when all emitted records with lower offset have been * acked. This guarantees that all offsets smaller than the committedOffset - * have been delivered. + * have been delivered, or that those offsets no longer exist in Kafka. + *

+ * The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API. * * @return the next OffsetAndMetadata to commit, or null if no offset is * ready to commit. */ public OffsetAndMetadata findNextCommitOffset() { long currOffset; - long nextCommitOffset = committedOffset; + long nextEarliestUncommittedOffset = earliestUncommittedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == nextCommitOffset + 1) { // found the next offset to commit + if (currOffset == nextEarliestUncommittedOffset) { // found the next offset to commit nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; - } else if (currOffset > nextCommitOffset + 1) { - if (emittedOffsets.contains(nextCommitOffset + 1)) { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + nextEarliestUncommittedOffset = currOffset + 1; + } else if (currOffset > nextEarliestUncommittedOffset) { + if (emittedOffsets.contains(nextEarliestUncommittedOffset)) { + LOG.debug("topic-partition [{}] has non-contiguous offset [{}]." + " It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -96,31 +98,34 @@ public OffsetAndMetadata findNextCommitOffset() { the next logical point in the topic. Next logical offset should be the first element after committedOffset in the ascending ordered emitted set. */ - LOG.debug("Processed non contiguous offset." - + " (committedOffset+1) is no longer part of the topic." - + " Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); + LOG.debug("Processed non-contiguous offset." + + " The earliest uncommitted offset is no longer part of the topic." + + " Missing uncommitted offset: [{}], Processed: [{}]", nextEarliestUncommittedOffset, currOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextEarliestUncommittedOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { + LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", + currOffset, nextEarliestUncommittedOffset); nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; + nextEarliestUncommittedOffset = currOffset + 1; } else { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]." - + " Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); + LOG.debug("topic-partition [{}] has non-contiguous offset [{}]." + + " Next Offset to commit should be [{}]", tp, currOffset, nextEarliestUncommittedOffset - 1); break; } } } else { //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); + LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current earliest uncommitted offset [{}]", + tp, currOffset, earliestUncommittedOffset); } } OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (nextCommitMsg != null) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextEarliestUncommittedOffset, + nextCommitMsg.getMetadata(Thread.currentThread())); LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", - tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); + tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@ -129,20 +134,20 @@ public OffsetAndMetadata findNextCommitOffset() { } /** - * Marks an offset has committed. This method has side effects - it sets the + * Marks an offset as committed. This method has side effects - it sets the * internal state in such a way that future calls to - * {@link #findNextCommitOffset()} will return offsets greater than the + * {@link #findNextCommitOffset()} will return offsets greater than or equal to the * offset specified, if any. * - * @param committedOffset offset to be marked as committed + * @param earliestUncommittedOffset Earliest uncommitted offset. All lower offsets are expected to have been committed. * @return Number of offsets committed in this commit */ - public long commit(OffsetAndMetadata committedOffset) { - final long preCommitCommittedOffsets = this.committedOffset; - final long numCommittedOffsets = committedOffset.offset() - this.committedOffset; - this.committedOffset = committedOffset.offset(); + public long commit(OffsetAndMetadata earliestUncommittedOffset) { + final long preCommitEarliestUncommittedOffset = this.earliestUncommittedOffset; + final long numCommittedOffsets = earliestUncommittedOffset.offset() - this.earliestUncommittedOffset; + this.earliestUncommittedOffset = earliestUncommittedOffset.offset(); for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) { - if (iterator.next().offset() <= committedOffset.offset()) { + if (iterator.next().offset() < earliestUncommittedOffset.offset()) { iterator.remove(); } else { break; @@ -150,7 +155,7 @@ public long commit(OffsetAndMetadata committedOffset) { } for (Iterator iterator = emittedOffsets.iterator(); iterator.hasNext();) { - if (iterator.next() <= committedOffset.offset()) { + if (iterator.next() < earliestUncommittedOffset.offset()) { iterator.remove(); } else { break; @@ -160,13 +165,13 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.trace("{}", this); LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", - preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); + preCommitEarliestUncommittedOffset, this.earliestUncommittedOffset - 1, numCommittedOffsets, tp); return numCommittedOffsets; } - public long getCommittedOffset() { - return committedOffset; + public long getEarliestUncommittedOffset() { + return earliestUncommittedOffset; } public boolean isEmpty() { @@ -180,13 +185,17 @@ public boolean contains(ConsumerRecord record) { public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } + + public boolean containsEmitted(long offset) { + return emittedOffsets.contains(offset); + } @Override public String toString() { return "OffsetManager{" + "topic-partition=" + tp + ", fetchOffset=" + initialFetchOffset - + ", committedOffset=" + committedOffset + + ", earliestUncommittedOffset=" + earliestUncommittedOffset + ", emittedOffsets=" + emittedOffsets + ", ackedMsgs=" + ackedMsgs + '}'; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java new file mode 100644 index 00000000000..553bbd5420a --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/internal/OffsetManagerTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.internal; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.junit.Test; + +public class OffsetManagerTest { + + private final long initialFetchOffset = 0; + private final TopicPartition testTp = new TopicPartition("testTopic", 0); + private final OffsetManager manager = new OffsetManager(testTp, initialFetchOffset); + + @Test + public void testFindNextCommittedOffsetWithNoAcks() { + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue())); + } + + @Test + public void testFindNextCommitOffsetWithOneAck() { + /* + * The KafkaConsumer commitSync API docs: "The committed offset should be the next message your application will consume, i.e. + * lastProcessedMessageOffset + 1. " + */ + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() { + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2)); + } + + @Test + public void testFindNextCommitOffsetWithAckedOffsetGap() { + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { + /** + * If topic compaction is enabled in Kafka some offsets may be deleted. + * We distinguish this case from regular gaps in the acked offset sequence caused by out of order acking + * by checking that offsets in the gap have been emitted at some point previously. + * If they haven't then they can't exist in Kafka, since the spout emits tuples in order. + */ + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", + nextCommitOffset.offset(), is(initialFetchOffset + 3)); + } + + @Test + public void testFindNextCommitOffsetWithUnackedOffsetGap() { + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() { + OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10); + emitAndAckMessage(getMessageId(0)); + OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(); + assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue())); + } + + @Test + public void testCommit() { + emitAndAckMessage(getMessageId(initialFetchOffset)); + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + + long committedMessages = manager.commit(new OffsetAndMetadata(initialFetchOffset + 2)); + + assertThat("Should have committed all messages to the left of the earliest uncommitted offset", committedMessages, is(2L)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset), is(false)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset + 1), is(false)); + assertThat("The uncommitted message should still be in the acked list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true)); + assertThat("The uncommitted message should still be in the emitted list", manager.containsEmitted(initialFetchOffset + 2), is(true)); + } + + private KafkaSpoutMessageId getMessageId(long offset) { + return new KafkaSpoutMessageId(new ConsumerRecord<>(testTp.topic(), testTp.partition(), offset, null, null)); + } + + private void emitAndAckMessage(KafkaSpoutMessageId msgId) { + manager.addToEmitMsgs(msgId.offset()); + manager.addToAckMsgs(msgId); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 7f0973b6d2f..266631d09dd 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -114,7 +114,7 @@ private void verifyAllMessagesCommitted(long messageCount) { Map commits = commitCapture.getValue(); assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); - assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); } @Test