Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -146,10 +147,13 @@ private boolean isAtLeastOnce() {
// =========== Consumer Rebalance Listener - On the same thread as the caller ===========
private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {

private Collection<TopicPartition> previousAssignment = new HashSet<>();

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
previousAssignment = partitions;
if (isAtLeastOnce() && initialized) {
initialized = false;
commitOffsetsForAckedTuples();
Expand All @@ -175,20 +179,14 @@ private void initialize(Collection<TopicPartition> partitions) {
* Emitted messages for partitions that are no longer assigned to this spout can't
* be acked and should not be retried, hence remove them from emitted collection.
*/
Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
while (msgIdIterator.hasNext()) {
KafkaSpoutMessageId msgId = msgIdIterator.next();
if (!partitionsSet.contains(msgId.getTopicPartition())) {
msgIdIterator.remove();
}
}
emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition()));
}

for (TopicPartition tp : partitions) {
Set<TopicPartition> newPartitions = new HashSet<>(partitions);
newPartitions.removeAll(previousAssignment);
for (TopicPartition tp : newPartitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
final long fetchOffset = doSeek(tp, committedOffset);
// Add offset managers for the new partitions.
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) {
offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
Expand All @@ -202,36 +200,31 @@ private void initialize(Collection<TopicPartition> partitions) {
* sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
*/
private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
long fetchOffset;
if (committedOffset != null) { // offset was committed for this TopicPartition
if (firstPollOffsetStrategy.equals(EARLIEST)) {
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
fetchOffset = kafkaConsumer.position(tp);
} else if (firstPollOffsetStrategy.equals(LATEST)) {
kafkaConsumer.seekToEnd(Collections.singleton(tp));
fetchOffset = kafkaConsumer.position(tp);
} else {
// By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
fetchOffset = committedOffset.offset() + 1;
kafkaConsumer.seek(tp, fetchOffset);
kafkaConsumer.seek(tp, committedOffset.offset() + 1);
}
} else { // no commits have ever been done, so start at the beginning or end depending on the strategy
if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
} else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
kafkaConsumer.seekToEnd(Collections.singleton(tp));
}
fetchOffset = kafkaConsumer.position(tp);
}
return fetchOffset;
return kafkaConsumer.position(tp);
}
}

// ======== Next Tuple =======
@Override
public void nextTuple() {
try {
if (initialized) {
if (initialized) {
if (commit()) {
commitOffsetsForAckedTuples();
}
Expand Down Expand Up @@ -342,12 +335,15 @@ private void emit() {
*/
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset());
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
"The spout is about to emit a message that has already been committed."
+ " This should never occur, and indicates a bug in the spout");
final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
if (isEmitTuple(tuple)) {
final boolean isScheduled = retryService.isScheduled(msgId);
Expand Down Expand Up @@ -411,6 +407,23 @@ private void commitOffsetsForAckedTuples() {
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
long position = kafkaConsumer.position(tp);
long committedOffset = tpOffset.getValue().offset();
if (position < committedOffset) {
/*
* The position is behind the committed offset. This can happen in some cases, e.g. if a message failed,
* lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked.
* The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple.
* Skip the consumer forward to the committed offset drop the current waiting to emit list,
* since it'll likely contain committed offsets.
*/
LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
position, committedOffset);
kafkaConsumer.seek(tp, committedOffset);
waitingToEmit = null;
}


final OffsetManager offsetManager = offsetManagers.get(tp);
long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
numUncommittedOffsets -= numCommittedOffsets;
Expand Down Expand Up @@ -440,6 +453,8 @@ public void ack(Object messageId) {
LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
}
} else {
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
Expand All @@ -460,6 +475,8 @@ public void fail(Object messageId) {
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
msgId.incrementNumFails();
if (!retryService.schedule(msgId)) {
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.commons.lang.Validate;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
Expand All @@ -48,6 +48,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private final TimeInterval maxDelay;
private final int maxRetries;

//This class assumes that there is at most one retry schedule per message id in this set at a time.
private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups

Expand Down Expand Up @@ -168,7 +169,7 @@ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this);
LOG.debug("Instantiated {}", this.toStringImpl());
}

@Override
Expand All @@ -191,13 +192,14 @@ public Map<TopicPartition, Long> earliestRetriableOffsets() {
@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
if (toRetryMsgs.contains(msgId)) {
if (isScheduled(msgId)) {
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
retry = true;
LOG.debug("Found entry to retry {}", retrySchedule);
break; //Stop searching if the message is known to be ready for retry
}
} else {
LOG.debug("Entry to retry not found {}", retrySchedule);
Expand All @@ -216,14 +218,14 @@ public boolean isScheduled(KafkaSpoutMessageId msgId) {
@Override
public boolean remove(KafkaSpoutMessageId msgId) {
boolean removed = false;
if (toRetryMsgs.contains(msgId)) {
if (isScheduled(msgId)) {
toRetryMsgs.remove(msgId);
for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
final RetrySchedule retrySchedule = iterator.next();
if (retrySchedule.msgId().equals(msgId)) {
iterator.remove();
toRetryMsgs.remove(msgId);
removed = true;
break;
break; //There is at most one schedule per message id
}
}
}
Expand Down Expand Up @@ -256,15 +258,8 @@ public boolean schedule(KafkaSpoutMessageId msgId) {
LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
return false;
} else {
if (toRetryMsgs.contains(msgId)) {
for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
final RetrySchedule retrySchedule = iterator.next();
if (retrySchedule.msgId().equals(msgId)) {
iterator.remove();
toRetryMsgs.remove(msgId);
}
}
}
//Remove existing schedule for the message id
remove(msgId);
final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
retrySchedules.add(retrySchedule);
toRetryMsgs.add(msgId);
Expand All @@ -289,9 +284,9 @@ public int readyMessageCount() {
}

@Override
public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
if (toRetryMsgs.contains(msgId)) {
public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) {
KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset);
if (isScheduled(msgId)) {
for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
if (originalMsgId.equals(msgId)) {
return originalMsgId;
Expand All @@ -303,6 +298,7 @@ public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {

// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once");
final long currentTimeNanos = Time.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
? currentTimeNanos + initialDelay.lengthNanos
Expand All @@ -312,6 +308,11 @@ private long nextTime(KafkaSpoutMessageId msgId) {

@Override
public String toString() {
return toStringImpl();
}

private String toStringImpl() {
//This is here to avoid an overridable call in the constructor
return "KafkaSpoutRetryExponentialBackoff{"
+ "delay=" + initialDelay
+ ", ratio=" + delayPeriod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

/**
Expand Down Expand Up @@ -85,10 +84,11 @@ public interface KafkaSpoutRetryService extends Serializable {
int readyMessageCount();

/**
* Gets the {@link KafkaSpoutMessageId} for the given record.
* @param record The record to fetch the id for
* Gets the {@link KafkaSpoutMessageId} for the record on the given topic partition and offset.
* @param topicPartition The topic partition of the record
* @param offset The offset of the record
* @return The id the record was scheduled for retry with,
* or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
*/
KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
KafkaSpoutMessageId getMessageId(TopicPartition topicPartition, long offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public OffsetAndMetadata findNextCommitOffset() {
LOG.debug("Processed non contiguous offset."
+ " (committedOffset+1) is no longer part of the topic."
+ " Committed: [{}], Processed: [{}]", committedOffset, currOffset);
final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1);
if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
Expand All @@ -110,9 +110,9 @@ public OffsetAndMetadata findNextCommitOffset() {
}
}
} else {
//Received a redundant ack. Ignore and continue processing.
LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
tp, currOffset, committedOffset);
throw new IllegalStateException("The offset [" + currOffset + "] is below the current committed "
+ "offset [" + committedOffset + "] for [" + tp + "]."
+ " This should not be possible, and likely indicates a bug in the spout's acking or emit logic.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;

import org.mockito.stubbing.OngoingStubbing;

public class KafkaSpoutCommitTest {

private final long offsetCommitPeriodMs = 2_000;
Expand Down Expand Up @@ -94,7 +96,7 @@ public void testCommitSuccessWithOffsetVoids() {
spout.ack(messageId);
}

// Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
// Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
Expand All @@ -104,25 +106,8 @@ public void testCommitSuccessWithOffsetVoids() {
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(consumerMock).poll(anyLong());

//verify that Offset 4 was last committed offset
//the offset void should be bridged in the next commit
Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
assertTrue(commits.containsKey(partition));
assertEquals(4, commits.get(partition).offset());

//Trigger second kafka consumer commit
reset(consumerMock);
when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
spout.nextTuple();

inOrder = inOrder(consumerMock);
inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(consumerMock).poll(anyLong());

//verify that Offset 9 was last committed offset
commits = commitCapture.getValue();
Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
assertTrue(commits.containsKey(partition));
assertEquals(9, commits.get(partition).offset());
}
Expand Down
Loading