diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index a5f169c5dc0..4d882055a78 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -34,7 +34,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; - +import org.apache.commons.lang.Validate; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -127,10 +127,13 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect // =========== Consumer Rebalance Listener - On the same thread as the caller =========== private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + private Collection previousAssignment = new HashSet<>(); + @Override public void onPartitionsRevoked(Collection partitions) { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + previousAssignment = partitions; if (!consumerAutoCommitMode && initialized) { initialized = false; commitOffsetsForAckedTuples(); @@ -148,27 +151,30 @@ public void onPartitionsAssigned(Collection partitions) { private void initialize(Collection partitions) { if (!consumerAutoCommitMode) { offsetManagers.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout - } - - retryService.retainAll(partitions); - - /* - * Emitted messages for partitions that are no longer assigned to this spout can't - * be acked and should not be retried, hence remove them from emitted collection. - */ - Set partitionsSet = new HashSet<>(partitions); - Iterator msgIdIterator = emitted.iterator(); - while (msgIdIterator.hasNext()) { - KafkaSpoutMessageId msgId = msgIdIterator.next(); - if (!partitionsSet.contains(msgId.getTopicPartition())) { - msgIdIterator.remove(); + retryService.retainAll(partitions); + + /* + * Emitted messages for partitions that are no longer assigned to this spout can't + * be acked and should not be retried, hence remove them from emitted collection. + */ + Iterator msgIdIterator = emitted.iterator(); + while (msgIdIterator.hasNext()) { + KafkaSpoutMessageId msgId = msgIdIterator.next(); + if (!partitions.contains(msgId.getTopicPartition())) { + msgIdIterator.remove(); + } } } - for (TopicPartition tp : partitions) { + Set newPartitions = new HashSet<>(partitions); + newPartitions.removeAll(previousAssignment); + for (TopicPartition tp : newPartitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); - setAcked(tp, fetchOffset); + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) { + offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); + } } initialized = true; LOG.info("Initialization complete"); @@ -178,18 +184,14 @@ private void initialize(Collection partitions) { * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { - long fetchOffset; if (committedOffset != null) { // offset was committed for this TopicPartition if (firstPollOffsetStrategy.equals(EARLIEST)) { kafkaConsumer.seekToBeginning(Collections.singleton(tp)); - fetchOffset = kafkaConsumer.position(tp); } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); - fetchOffset = kafkaConsumer.position(tp); } else { // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. - fetchOffset = committedOffset.offset() + 1; - kafkaConsumer.seek(tp, fetchOffset); + kafkaConsumer.seek(tp, committedOffset.offset() + 1); } } else { // no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { @@ -197,9 +199,8 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } - fetchOffset = kafkaConsumer.position(tp); } - return fetchOffset; + return kafkaConsumer.position(tp); } } @@ -215,7 +216,7 @@ private void setAcked(TopicPartition tp, long fetchOffset) { @Override public void nextTuple() { try { - if (initialized) { + if (initialized) { if (commit()) { commitOffsetsForAckedTuples(); } @@ -324,6 +325,9 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { + Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), + "The spout is about to emit a message that has already been committed." + + " This should never occur, and indicates a bug in the spout"); final List tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId); @@ -388,6 +392,23 @@ private void commitOffsetsForAckedTuples() { for (Map.Entry tpOffset : nextCommitOffsets.entrySet()) { //Update the OffsetManager for each committed partition, and update numUncommittedOffsets final TopicPartition tp = tpOffset.getKey(); + long position = kafkaConsumer.position(tp); + long committedOffset = tpOffset.getValue().offset(); + if (position < committedOffset) { + /* + * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, + * lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked. + * The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple. + * Skip the consumer forward to the committed offset drop the current waiting to emit list, + * since it'll likely contain committed offsets. + */ + LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", + position, committedOffset); + kafkaConsumer.seek(tp, committedOffset); + waitingToEmit = null; + } + + final OffsetManager offsetManager = offsetManagers.get(tp); long numCommittedOffsets = offsetManager.commit(tpOffset.getValue()); numUncommittedOffsets -= numCommittedOffsets; @@ -413,6 +434,8 @@ public void ack(Object messageId) { LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); } } else { + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." + + " This should never occur barring errors in the RetryService implementation or the spout code."); if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); } @@ -429,6 +452,8 @@ public void fail(Object messageId) { LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." + + " This should never occur barring errors in the RetryService implementation or the spout code."); msgId.incrementNumFails(); if (!retryService.schedule(msgId)) { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index 6b53779563b..68a6f3fe856 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.Validate; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.utils.Time; @@ -49,6 +50,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService private final TimeInterval maxDelay; private final int maxRetries; + //This class assumes that there is at most one retry schedule per message id in this set at a time. private final Set retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); private final Set toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups @@ -168,7 +170,7 @@ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval this.delayPeriod = delayPeriod; this.maxRetries = maxRetries; this.maxDelay = maxDelay; - LOG.debug("Instantiated {}", this); + LOG.debug("Instantiated {}", this.toStringImpl()); } @Override @@ -196,13 +198,14 @@ public Map earliestRetriableOffsets() { @Override public boolean isReady(KafkaSpoutMessageId msgId) { boolean retry = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { if (retrySchedule.msgId.equals(msgId)) { retry = true; LOG.debug("Found entry to retry {}", retrySchedule); + break; //Stop searching if the message is known to be ready for retry } } else { LOG.debug("Entry to retry not found {}", retrySchedule); @@ -221,14 +224,14 @@ public boolean isScheduled(KafkaSpoutMessageId msgId) { @Override public boolean remove(KafkaSpoutMessageId msgId) { boolean removed = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { + toRetryMsgs.remove(msgId); for (Iterator iterator = retrySchedules.iterator(); iterator.hasNext(); ) { final RetrySchedule retrySchedule = iterator.next(); if (retrySchedule.msgId().equals(msgId)) { iterator.remove(); - toRetryMsgs.remove(msgId); removed = true; - break; + break; //There is at most one schedule per message id } } } @@ -261,15 +264,8 @@ public boolean schedule(KafkaSpoutMessageId msgId) { LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); return false; } else { - if (toRetryMsgs.contains(msgId)) { - for (Iterator iterator = retrySchedules.iterator(); iterator.hasNext(); ) { - final RetrySchedule retrySchedule = iterator.next(); - if (retrySchedule.msgId().equals(msgId)) { - iterator.remove(); - toRetryMsgs.remove(msgId); - } - } - } + //Remove existing schedule for the message id + remove(msgId); final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); retrySchedules.add(retrySchedule); toRetryMsgs.add(msgId); @@ -296,7 +292,7 @@ public int readyMessageCount() { @Override public KafkaSpoutMessageId getMessageId(ConsumerRecord record) { KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { if (originalMsgId.equals(msgId)) { return originalMsgId; @@ -308,6 +304,7 @@ public KafkaSpoutMessageId getMessageId(ConsumerRecord record) { // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE private long nextTime(KafkaSpoutMessageId msgId) { + Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once"); final long currentTimeNanos = Time.nanoTime(); final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... ? currentTimeNanos + initialDelay.lengthNanos @@ -317,6 +314,11 @@ private long nextTime(KafkaSpoutMessageId msgId) { @Override public String toString() { + return toStringImpl(); + } + + private String toStringImpl() { + //This is here to avoid an overridable call in the constructor return "KafkaSpoutRetryExponentialBackoff{" + "delay=" + initialDelay + ", ratio=" + delayPeriod + diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java index 5147752b588..12d26dac65a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -18,12 +18,12 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.common.TopicPartition; import java.io.Serializable; import java.util.Collection; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; /** * Represents the logic that manages the retrial of failed tuples. @@ -85,8 +85,10 @@ public interface KafkaSpoutRetryService extends Serializable { /** * Gets the {@link KafkaSpoutMessageId} for the given record. + * * @param record The record to fetch the id for - * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry. + * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for + * retry. */ KafkaSpoutMessageId getMessageId(ConsumerRecord record); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 51390720714..b6d36d83b6f 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -92,7 +92,7 @@ public OffsetAndMetadata findNextCommitOffset() { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; @@ -103,9 +103,9 @@ public OffsetAndMetadata findNextCommitOffset() { } } } else { - //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); + throw new IllegalStateException("The offset [" + currOffset + "] is below the current committed " + + "offset [" + committedOffset + "] for [" + tp + "]." + + " This should not be possible, and likely indicates a bug in the spout's acking or emit logic."); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index b7737c7eb94..8710f7b5104 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -42,6 +42,8 @@ import org.mockito.InOrder; import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.OngoingStubbing; + public class KafkaSpoutCommitTest { private final long offsetCommitPeriodMs = 2_000; @@ -114,7 +116,7 @@ public void testCommitSuccessWithOffsetVoids() { spout.ack(messageId); } - // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4 + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); Map>> emptyConsumerRecords = Collections.emptyMap(); when(consumerMock.poll(anyLong())) @@ -125,25 +127,8 @@ public void testCommitSuccessWithOffsetVoids() { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 4 was last committed offset - //the offset void should be bridged in the next commit - Map commits = commitCapture.getValue(); - assertTrue(commits.containsKey(partition)); - assertEquals(4, commits.get(partition).offset()); - - //Trigger second kafka consumer commit - reset(consumerMock); - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(emptyConsumerRecords)); - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - spout.nextTuple(); - - inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 9 was last committed offset - commits = commitCapture.getValue(); + Map commits = commitCapture.getValue(); assertTrue(commits.containsKey(partition)); assertEquals(9, commits.get(partition).offset()); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index bd6e5826a31..1c7f723bd4d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -17,11 +17,13 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -32,8 +34,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -171,7 +175,7 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class))) .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0)) .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); - + //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); @@ -186,4 +190,50 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); verify(retryServiceMock).schedule(emittedMessageIds.get(1)); } + + @Test + public void testReassignPartitionSeeksForOnlyNewPartitions() { + /* + * When partitions are reassigned, the spout should seek with the first poll offset strategy for new partitions. + * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those. + */ + + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) + .build(), consumerFactoryMock); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition assignedPartition = new TopicPartition(topic, 1); + TopicPartition newPartition = new TopicPartition(topic, 2); + + //Setup spout with mock consumer so we can get at the rebalance listener + spout.open(conf, contextMock, collectorMock); + spout.activate(); + + verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + + //Assign partitions to the spout + ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); + Set assignedPartitions = new HashSet<>(); + assignedPartitions.add(assignedPartition); + consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + reset(consumerMock); + + //Set up committed so it looks like some messages have been committed on each partition + long committedOffset = 500; + when(consumerMock.committed(assignedPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + when(consumerMock.committed(newPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + + //Now rebalance and add a new partition + consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); + Set newAssignedPartitions = new HashSet<>(); + newAssignedPartitions.add(assignedPartition); + newAssignedPartitions.add(newPartition); + consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions); + + //This partition was previously assigned, so the consumer position shouldn't change + verify(consumerMock, never()).seek(eq(assignedPartition), anyLong()); + //This partition is new, and should start at the committed offset + verify(consumerMock).seek(newPartition, committedOffset + 1); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java new file mode 100644 index 00000000000..f6de6a80261 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; + +public class KafkaSpoutRetryExponentialBackoffTest { + + private final TopicPartition testTopic = new TopicPartition("topic", 0); + private final TopicPartition testTopic2 = new TopicPartition("other-topic", 0); + + private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), 1, TimeInterval.seconds(0)); + } + + private KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), TimeInterval.seconds(0), 1, TimeInterval.seconds(1)); + } + + private ConsumerRecord createRecord(TopicPartition tp, long offset) { + return new ConsumerRecord<>(tp.topic(), tp.partition(), offset, null, null); + } + + @Test + public void testCanScheduleRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must schedule the message for retry", scheduled, is(true)); + KafkaSpoutMessageId retrievedMessageId = retryService.getMessageId(createRecord(testTopic, offset)); + assertThat("The service should return the original message id when asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.isReady(msgId), is(true)); + assertThat(retryService.readyMessageCount(), is(1)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + } + + @Test + public void testCanRescheduleRetry() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must be able to reschedule an already scheduled id", scheduled, is(true)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry yet since it was rescheduled", retryService.isReady(msgId), is(false)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + Time.advanceTime(500); + assertThat("The message should be ready for retry once the full delay has passed", retryService.isReady(msgId), is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + assertThat(retryService.readyMessageCount(), is(1)); + } + } + + @Test + public void testCannotContainMultipleSchedulesForId() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + retryService.remove(msgId); + assertThat("The message should no longer be scheduled", retryService.isScheduled(msgId), is(false)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry because it isn't scheduled", retryService.isReady(msgId), is(false)); + } + } + + @Test + public void testCanRemoveRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + boolean removed = retryService.remove(msgId); + + assertThat(removed, is(true)); + assertThat(retryService.isScheduled(msgId), is(false)); + assertThat(retryService.isReady(msgId), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + } + + @Test + public void testCanHandleMultipleTopics() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are messages from multiple partitions scheduled + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdTp1 = retryService.getMessageId(createRecord(testTopic, offset)); + KafkaSpoutMessageId msgIdTp2 = retryService.getMessageId(createRecord(testTopic2, offset)); + msgIdTp1.incrementNumFails(); + msgIdTp2.incrementNumFails(); + + boolean scheduledOne = retryService.schedule(msgIdTp1); + Time.advanceTime(500); + boolean scheduledTwo = retryService.schedule(msgIdTp2); + + //The retry schedules for two messages should be unrelated + assertThat(scheduledOne, is(true)); + assertThat(retryService.isScheduled(msgIdTp1), is(true)); + assertThat(scheduledTwo, is(true)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp1), is(true)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, offset))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp2), is(true)); + Map earliestOffsets = new HashMap<>(); + earliestOffsets.put(testTopic, offset); + earliestOffsets.put(testTopic2, offset); + assertThat(retryService.earliestRetriableOffsets(), is(earliestOffsets)); + + //The service must be able to remove retry schedules for unnecessary partitions + retryService.retainAll(Collections.singleton(testTopic2)); + assertThat(retryService.isScheduled(msgIdTp1), is(false)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic2, offset))); + } + } + + @Test + public void testCanHandleMultipleMessagesOnPartition() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are multiple messages scheduled on a partition + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdEarliest = retryService.getMessageId(createRecord(testTopic, offset)); + KafkaSpoutMessageId msgIdLatest = retryService.getMessageId(createRecord(testTopic, offset + 1)); + msgIdEarliest.incrementNumFails(); + msgIdLatest.incrementNumFails(); + + retryService.schedule(msgIdEarliest); + Time.advanceTime(500); + retryService.schedule(msgIdLatest); + + assertThat(retryService.isScheduled(msgIdEarliest), is(true)); + assertThat(retryService.isScheduled(msgIdLatest), is(true)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + retryService.remove(msgIdEarliest); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdLatest.offset()))); + } + } + + @Test + public void testMaxRetries() { + try (SimulatedTime time = new SimulatedTime()) { + int maxRetries = 3; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + for (int i = 0; i < maxRetries; i++) { + msgId.incrementNumFails(); + } + + //Should be allowed to retry 3 times, in addition to original try + boolean scheduled = retryService.schedule(msgId); + + assertThat(scheduled, is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + + retryService.remove(msgId); + msgId.incrementNumFails(); + boolean rescheduled = retryService.schedule(msgId); + + assertThat("The message should not be allowed to retry once the limit is reached", rescheduled, is(false)); + assertThat(retryService.isScheduled(msgId), is(false)); + } + } + + @Test + public void testMaxDelay() { + try (SimulatedTime time = new SimulatedTime()) { + int maxDelaySecs = 2; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + + Time.advanceTimeSecs(maxDelaySecs); + assertThat("The message should be ready for retry after the max delay", retryService.isReady(msgId), is(true)); + } + } + + private void validateBackoff(int expectedBackoffSeconds, KafkaSpoutMessageId msgId, KafkaSpoutRetryExponentialBackoff retryService) { + Time.advanceTimeSecs(expectedBackoffSeconds - 1); + assertThat("The message should not be ready for retry until the backoff has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat(retryService.isReady(msgId), is(true)); + } + + @Test + public void testExponentialBackoff() { + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(4), Integer.MAX_VALUE, TimeInterval.seconds(Integer.MAX_VALUE)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + msgId.incrementNumFails(); //First failure is the initial delay, so not interesting + + //Expecting 4*2^(failCount-1) + List expectedBackoffsSecs = Arrays.asList(new Integer[]{8, 16, 32}); + + for (Integer expectedBackoffSecs : expectedBackoffsSecs) { + retryService.schedule(msgId); + + Time.advanceTimeSecs(expectedBackoffSecs - 1); + assertThat("The message should not be ready for retry until backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat("The message should be ready for retry once backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(true)); + + msgId.incrementNumFails(); + retryService.remove(msgId); + } + } + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 6042c80cd34..fff6902083d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -36,7 +36,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyObject; @@ -53,12 +52,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.hamcrest.Matchers; import org.junit.Before; import org.mockito.Captor; import org.mockito.MockitoAnnotations; @@ -78,7 +79,8 @@ public class SingleTopicKafkaSpoutTest { private KafkaConsumer consumerSpy; private KafkaConsumerFactory consumerFactory; private KafkaSpout spout; - private int maxRetries = 3; + private final int maxPollRecords = 10; + private final int maxRetries = 3; @Before public void setUp() { @@ -87,6 +89,7 @@ public void setUp() { .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .build(); this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = new KafkaConsumerFactory() { @@ -129,6 +132,59 @@ private void verifyAllMessagesCommitted(long messageCount) { assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); } + @Test + public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception { + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = maxPollRecords * 2; + initializeSpout(messageCount); + + //Emit all messages and fail the first one while acking the rest + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); + List messageIds = messageIdCaptor.getAllValues(); + for (int i = 1; i < messageIds.size(); i++) { + spout.ack(messageIds.get(i)); + } + KafkaSpoutMessageId failedTuple = messageIds.get(0); + spout.fail(failedTuple); + + //Advance the time and replay the failed tuple. + reset(collector); + spout.nextTuple(); + ArgumentCaptor failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); + + assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); + + /* Ack the tuple, and commit. + * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. + */ + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); + spout.ack(failedIdReplayCaptor.getValue()); + spout.nextTuple(); + verify(consumerSpy).commitSync(commitCapture.capture()); + + Map capturedCommit = commitCapture.getValue(); + TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1)); + + /* Verify that the following acked (now committed) tuples are not emitted again + * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, + * this verifies that the spout keeps the consumer position ahead of the committed offset when committing + */ + reset(collector); + //Just do a few polls to check that nothing more is emitted + for(int i = 0; i < 3; i++) { + spout.nextTuple(); + } + verify(collector, never()).emit(anyString(), anyList(), anyObject()); + } + } + @Test public void shouldContinueWithSlowDoubleAcks() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { @@ -286,7 +342,7 @@ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { verifyAllMessagesCommitted(messageCount); } } - + @Test public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { //The spout must reemit retriable tuples, even if they fail out of order. diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java new file mode 100644 index 00000000000..e8896c9bd7e --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.util.NoSuchElementException; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class OffsetManagerTest { + + @Rule + public ExpectedException expect = ExpectedException.none(); + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() { + /*If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets + * Since the Kafka consumer should return offsets in order, we can assume that if a message is acked + * then any prior message will have been emitted at least once. + * If we see an acked message and some of the offsets preceding it were not emitted, they must have been compacted away and should be skipped. + */ + + TopicPartition tp = new TopicPartition("test", 0); + OffsetManager manager = new OffsetManager(tp, 0); + + manager.addToEmitMsgs(0); + manager.addToEmitMsgs(1); + manager.addToEmitMsgs(2); + //3, 4 compacted away + manager.addToEmitMsgs(5); + manager.addToEmitMsgs(6); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(2L)); + + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset().offset(), is(6L)); + } + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() { + + TopicPartition tp = new TopicPartition("test", 0); + OffsetManager manager = new OffsetManager(tp, 0); + + //0-4 compacted away + manager.addToEmitMsgs(5); + manager.addToEmitMsgs(6); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue())); + + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset().offset(), is(6L)); + } + +}