Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class OffsetManager {
/* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
* Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
private final long initialFetchOffset;
// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
private long committedOffset;
// Earliest uncommitted offset, i.e. the last committed offset + 1. Initially it is set to fetchOffset
private long earliestUncommittedOffset;
// Emitted Offsets List
private final NavigableSet<Long> emittedOffsets = new TreeSet<>();
// Acked messages sorted by ascending order of offset
Expand All @@ -53,7 +53,7 @@ public class OffsetManager {
public OffsetManager(TopicPartition tp, long initialFetchOffset) {
this.tp = tp;
this.initialFetchOffset = initialFetchOffset;
this.committedOffset = initialFetchOffset - 1;
this.earliestUncommittedOffset = initialFetchOffset;
LOG.debug("Instantiated {}", this);
}

Expand All @@ -66,26 +66,28 @@ public void addToEmitMsgs(long offset) {
}

/**
* An offset is only committed when all records with lower offset have been
* An offset can only be committed when all emitted records with lower offset have been
* acked. This guarantees that all offsets smaller than the committedOffset
* have been delivered.
* have been delivered, or that those offsets no longer exist in Kafka.
* <p/>
* The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API.
*
* @return the next OffsetAndMetadata to commit, or null if no offset is
* ready to commit.
*/
public OffsetAndMetadata findNextCommitOffset() {
long currOffset;
long nextCommitOffset = committedOffset;
long nextEarliestUncommittedOffset = earliestUncommittedOffset;
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata

for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
currOffset = currAckedMsg.offset();
if (currOffset == nextCommitOffset + 1) { // found the next offset to commit
if (currOffset == nextEarliestUncommittedOffset) { // found the next offset to commit
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currOffset > nextCommitOffset + 1) {
if (emittedOffsets.contains(nextCommitOffset + 1)) {
LOG.debug("topic-partition [{}] has non-continuous offset [{}]."
nextEarliestUncommittedOffset = currOffset + 1;
} else if (currOffset > nextEarliestUncommittedOffset) {
if (emittedOffsets.contains(nextEarliestUncommittedOffset)) {
LOG.debug("topic-partition [{}] has non-contiguous offset [{}]."
+ " It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
Expand All @@ -96,31 +98,34 @@ public OffsetAndMetadata findNextCommitOffset() {
the next logical point in the topic. Next logical offset should be the
first element after committedOffset in the ascending ordered emitted set.
*/
LOG.debug("Processed non contiguous offset."
+ " (committedOffset+1) is no longer part of the topic."
+ " Committed: [{}], Processed: [{}]", committedOffset, currOffset);
final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
LOG.debug("Processed non-contiguous offset."
+ " The earliest uncommitted offset is no longer part of the topic."
+ " Missing uncommitted offset: [{}], Processed: [{}]", nextEarliestUncommittedOffset, currOffset);
final Long nextEmittedOffset = emittedOffsets.ceiling(nextEarliestUncommittedOffset);
if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset",
currOffset, nextEarliestUncommittedOffset);
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
nextEarliestUncommittedOffset = currOffset + 1;
} else {
LOG.debug("topic-partition [{}] has non-continuous offset [{}]."
+ " Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset);
LOG.debug("topic-partition [{}] has non-contiguous offset [{}]."
+ " Next Offset to commit should be [{}]", tp, currOffset, nextEarliestUncommittedOffset - 1);
break;
}
}
} else {
//Received a redundant ack. Ignore and continue processing.
LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
tp, currOffset, committedOffset);
LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current earliest uncommitted offset [{}]",
tp, currOffset, earliestUncommittedOffset);
}
}

OffsetAndMetadata nextCommitOffsetAndMetadata = null;
if (nextCommitMsg != null) {
nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextEarliestUncommittedOffset,
nextCommitMsg.getMetadata(Thread.currentThread()));
LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",
tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1);
} else {
LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
}
Expand All @@ -129,28 +134,28 @@ public OffsetAndMetadata findNextCommitOffset() {
}

/**
* Marks an offset has committed. This method has side effects - it sets the
* Marks an offset as committed. This method has side effects - it sets the
* internal state in such a way that future calls to
* {@link #findNextCommitOffset()} will return offsets greater than the
* {@link #findNextCommitOffset()} will return offsets greater than or equal to the
* offset specified, if any.
*
* @param committedOffset offset to be marked as committed
* @param earliestUncommittedOffset Earliest uncommitted offset. All lower offsets are expected to have been committed.
* @return Number of offsets committed in this commit
*/
public long commit(OffsetAndMetadata committedOffset) {
final long preCommitCommittedOffsets = this.committedOffset;
final long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
this.committedOffset = committedOffset.offset();
public long commit(OffsetAndMetadata earliestUncommittedOffset) {
final long preCommitEarliestUncommittedOffset = this.earliestUncommittedOffset;
final long numCommittedOffsets = earliestUncommittedOffset.offset() - this.earliestUncommittedOffset;
this.earliestUncommittedOffset = earliestUncommittedOffset.offset();
for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) {
if (iterator.next().offset() <= committedOffset.offset()) {
if (iterator.next().offset() < earliestUncommittedOffset.offset()) {
iterator.remove();
} else {
break;
}
}

for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) {
if (iterator.next() <= committedOffset.offset()) {
if (iterator.next() < earliestUncommittedOffset.offset()) {
iterator.remove();
} else {
break;
Expand All @@ -160,13 +165,13 @@ public long commit(OffsetAndMetadata committedOffset) {
LOG.trace("{}", this);

LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp);
preCommitEarliestUncommittedOffset, this.earliestUncommittedOffset - 1, numCommittedOffsets, tp);

return numCommittedOffsets;
}

public long getCommittedOffset() {
return committedOffset;
public long getEarliestUncommittedOffset() {
return earliestUncommittedOffset;
}

public boolean isEmpty() {
Expand All @@ -180,13 +185,17 @@ public boolean contains(ConsumerRecord record) {
public boolean contains(KafkaSpoutMessageId msgId) {
return ackedMsgs.contains(msgId);
}

public boolean containsEmitted(long offset) {
return emittedOffsets.contains(offset);
}

@Override
public String toString() {
return "OffsetManager{"
+ "topic-partition=" + tp
+ ", fetchOffset=" + initialFetchOffset
+ ", committedOffset=" + committedOffset
+ ", earliestUncommittedOffset=" + earliestUncommittedOffset
+ ", emittedOffsets=" + emittedOffsets
+ ", ackedMsgs=" + ackedMsgs
+ '}';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2017 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.internal;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.junit.Test;

public class OffsetManagerTest {

private final long initialFetchOffset = 0;
private final TopicPartition testTp = new TopicPartition("testTopic", 0);
private final OffsetManager manager = new OffsetManager(testTp, initialFetchOffset);

@Test
public void testFindNextCommittedOffsetWithNoAcks() {
OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue()));
}

@Test
public void testFindNextCommitOffsetWithOneAck() {
/*
* The KafkaConsumer commitSync API docs: "The committed offset should be the next message your application will consume, i.e.
* lastProcessedMessageOffset + 1. "
*/
emitAndAckMessage(getMessageId(initialFetchOffset));
OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1));
}

@Test
public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() {
emitAndAckMessage(getMessageId(initialFetchOffset + 1));
emitAndAckMessage(getMessageId(initialFetchOffset));
OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2));
}

@Test
public void testFindNextCommitOffsetWithAckedOffsetGap() {
emitAndAckMessage(getMessageId(initialFetchOffset + 2));
manager.addToEmitMsgs(initialFetchOffset + 1);
emitAndAckMessage(getMessageId(initialFetchOffset));
OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
}

@Test
public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() {
/**
* If topic compaction is enabled in Kafka some offsets may be deleted.
* We distinguish this case from regular gaps in the acked offset sequence caused by out of order acking
* by checking that offsets in the gap have been emitted at some point previously.
* If they haven't then they can't exist in Kafka, since the spout emits tuples in order.
*/
emitAndAckMessage(getMessageId(initialFetchOffset + 2));
emitAndAckMessage(getMessageId(initialFetchOffset));
OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist",
nextCommitOffset.offset(), is(initialFetchOffset + 3));
}

@Test
public void testFindNextCommitOffsetWithUnackedOffsetGap() {
manager.addToEmitMsgs(initialFetchOffset + 1);
emitAndAckMessage(getMessageId(initialFetchOffset));
OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
}

@Test
public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() {
OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10);
emitAndAckMessage(getMessageId(0));
OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset();
assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue()));
}

@Test
public void testCommit() {
emitAndAckMessage(getMessageId(initialFetchOffset));
emitAndAckMessage(getMessageId(initialFetchOffset + 1));
emitAndAckMessage(getMessageId(initialFetchOffset + 2));

long committedMessages = manager.commit(new OffsetAndMetadata(initialFetchOffset + 2));

assertThat("Should have committed all messages to the left of the earliest uncommitted offset", committedMessages, is(2L));
assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset)), is(false));
assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset), is(false));
assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false));
assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset + 1), is(false));
assertThat("The uncommitted message should still be in the acked list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true));
assertThat("The uncommitted message should still be in the emitted list", manager.containsEmitted(initialFetchOffset + 2), is(true));
}

private KafkaSpoutMessageId getMessageId(long offset) {
return new KafkaSpoutMessageId(new ConsumerRecord<>(testTp.topic(), testTp.partition(), offset, null, null));
}

private void emitAndAckMessage(KafkaSpoutMessageId msgId) {
manager.addToEmitMsgs(msgId.offset());
manager.addToAckMsgs(msgId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void verifyAllMessagesCommitted(long messageCount) {
Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1));
assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
}

@Test
Expand Down