diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index b181c43c3da..3364fb080f7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -91,9 +91,7 @@ public class KafkaSpout extends BaseRichSpout { // Always empty if processing guarantee is none or at-most-once private transient Set emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient Iterator> waitingToEmit; - // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled. - private transient long numUncommittedOffsets; + private transient Iterator> waitingToEmit; // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; @@ -115,7 +113,6 @@ public void open(Map conf, TopologyContext context, SpoutOutputC // Spout internals this.collector = collector; - numUncommittedOffsets = 0; // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); @@ -227,13 +224,19 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { public void nextTuple() { try { if (initialized) { + + if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { + kafkaSpoutConfig.getSubscription().refreshAssignment(); + } + if (commit()) { commitOffsetsForAckedTuples(); } - if (poll()) { + Set pollablePartitions = poll(); + if (!pollablePartitions.isEmpty()) { try { - setWaitingToEmit(pollKafkaBroker()); + setWaitingToEmit(pollKafkaBroker(pollablePartitions)); } catch (RetriableException e) { LOG.error("Failed to poll from kafka.", e); } @@ -260,27 +263,38 @@ private boolean commit() { return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode } - private boolean poll() { + private Set poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); - final int readyMessageCount = retryService.readyMessageCount(); - final boolean poll = !waitingToEmit() - // Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit. - // Accounting for retriable tuples in this way still guarantees that the limit is followed on a per partition basis, - // and prevents locking up the spout when there are too many retriable tuples - && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || !isAtLeastOnceProcessing()); - - if (!poll) { - if (waitingToEmit()) { - LOG.debug("Not polling. Tuples waiting to be emitted." - + " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets); - } - - if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnceProcessing()) { - LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", - numUncommittedOffsets, maxUncommittedOffsets); + + if (waitingToEmit()) { + LOG.debug("Not polling. Tuples waiting to be emitted."); + return Collections.emptySet(); + } + Set assignment = kafkaConsumer.assignment(); + if (!isAtLeastOnceProcessing()) { + return assignment; + } + Map earliestRetriableOffsets = retryService.earliestRetriableOffsets(); + Set pollablePartitions = new HashSet<>(); + for (TopicPartition tp : assignment) { + OffsetManager offsetManager = offsetManagers.get(tp); + int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets(); + if (numUncommittedOffsets < maxUncommittedOffsets) { + //Allow poll if the partition is not at the maxUncommittedOffsets limit + pollablePartitions.add(tp); + } else { + long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets); + Long earliestRetriableOffset = earliestRetriableOffsets.get(tp); + if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) { + //Allow poll if there are retriable tuples within the maxUncommittedOffsets limit + pollablePartitions.add(tp); + } else { + LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp, + numUncommittedOffsets, maxUncommittedOffsets); + } } } - return poll; + return pollablePartitions; } private boolean waitingToEmit() { @@ -296,28 +310,34 @@ private void setWaitingToEmit(ConsumerRecords consumerRecords) { } // ======== poll ========= - private ConsumerRecords pollKafkaBroker() { - doSeekRetriableTopicPartitions(); - if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { - kafkaSpoutConfig.getSubscription().refreshAssignment(); - } - final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); - final int numPolledRecords = consumerRecords.count(); - LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", - numPolledRecords, numUncommittedOffsets); - if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { - //Commit polled records immediately to ensure delivery is at-most-once. - kafkaConsumer.commitSync(); + private ConsumerRecords pollKafkaBroker(Set pollablePartitions) { + doSeekRetriableTopicPartitions(pollablePartitions); + Set pausedPartitions = new HashSet<>(kafkaConsumer.assignment()); + pausedPartitions.removeIf(pollablePartitions::contains); + try { + kafkaConsumer.pause(pausedPartitions); + final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final int numPolledRecords = consumerRecords.count(); + LOG.debug("Polled [{}] records from Kafka.", + numPolledRecords); + if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { + //Commit polled records immediately to ensure delivery is at-most-once. + kafkaConsumer.commitSync(); + } + return consumerRecords; + } finally { + kafkaConsumer.resume(pausedPartitions); } - return consumerRecords; } - private void doSeekRetriableTopicPartitions() { + private void doSeekRetriableTopicPartitions(Set pollablePartitions) { final Map retriableTopicPartitions = retryService.earliestRetriableOffsets(); for (Entry retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) { - //Seek directly to the earliest retriable message for each retriable topic partition - kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + if (pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) { + //Seek directly to the earliest retriable message for each retriable topic partition + kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + } } } @@ -367,8 +387,6 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule. retryService.remove(msgId); - } else { //New tuple, hence increment the uncommitted offset counter - numUncommittedOffsets++; } collector.emit(stream, tuple, msgId); tupleListener.onEmit(tuple, msgId); @@ -429,10 +447,8 @@ private void commitOffsetsForAckedTuples() { final OffsetManager offsetManager = offsetManagers.get(tp); - long numCommittedOffsets = offsetManager.commit(tpOffset.getValue()); - numUncommittedOffsets -= numCommittedOffsets; - LOG.debug("[{}] uncommitted offsets across all topic partitions", - numUncommittedOffsets); + offsetManager.commit(tpOffset.getValue()); + LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp); } } else { LOG.trace("No offsets to commit. {}", this); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 6a693fe24f6..d5fceb46745 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -253,11 +253,12 @@ public Builder setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { } /** - * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this - * limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets - * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded, - * but no partition will exceed this limit by more than maxPollRecords - 1. - * + * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. + * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number + * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. + * This limit is per partition and may in some cases be exceeded, + * but each partition cannot exceed this limit by more than maxPollRecords - 1. + * *

This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. * * @param maxUncommittedOffsets max number of records that can be be pending commit diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 075c4dd6308..e7711b0bb92 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -20,6 +20,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.TreeSet; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -65,6 +66,27 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N) public void addToEmitMsgs(long offset) { this.emittedOffsets.add(offset); // O(Log N) } + + public int getNumUncommittedOffsets() { + return this.emittedOffsets.size(); + } + + /** + * Gets the offset of the nth emitted message after the committed offset. + * Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have been emitted, + * getNthUncommittedOffsetAfterCommittedOffset(3) returns 8. + * + * @param index The index of the message to get the offset for + * @return The offset + * @throws NoSuchElementException if the index is out of range + */ + public long getNthUncommittedOffsetAfterCommittedOffset(int index) { + Iterator offsetIter = emittedOffsets.iterator(); + for (int i = 0; i < index - 1; i++) { + offsetIter.next(); + } + return offsetIter.next(); + } /** * An offset can only be committed when all emitted records with lower offset have been diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index 0714d372ded..441e649aecc 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; @@ -75,16 +76,13 @@ public void setUp() { public void testCommitSuccessWithOffsetVoids() { //Verify that the commit logic can handle offset voids try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + Set assignedPartitions = Collections.singleton(partition); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, assignedPartitions); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); // Offsets emitted are 0,1,2,3,4,,8,9 - for (int i = 0; i < 5; i++) { - recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); - } - for (int i = 8; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); - } + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 74341da8a90..7cfd6b78c2e 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -26,8 +26,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -73,11 +76,7 @@ public void testNextTupleEmitsAtMostOneTuple() { //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); - List> recordsForPartition = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); - } - records.put(partition, recordsForPartition); + records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10)); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(records)); @@ -90,27 +89,24 @@ public void testNextTupleEmitsAtMostOneTuple() { @Test public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() { //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded - + //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); - List> recordsForPartition = new ArrayList<>(); - for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { - //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); - } - records.put(partition, recordsForPartition); + int numRecords = spoutConfig.getMaxUncommittedOffsets(); + //This is cheating a bit since maxPollRecords would normally spread this across multiple polls + records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords)); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(records)); - for (int i = 0; i < recordsForPartition.size(); i++) { + for (int i = 0; i < numRecords; i++) { spout.nextTuple(); } ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); + verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture()); for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); @@ -120,16 +116,16 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee Time.advanceTime(50); //No backoff for test retry service, just check that messages will retry immediately - for (int i = 0; i < recordsForPartition.size(); i++) { + for (int i = 0; i < numRecords; i++) { spout.nextTuple(); } ArgumentCaptor retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), retryMessageIds.capture()); + verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), retryMessageIds.capture()); //Verify that the poll started at the earliest retriable tuple offset List failedOffsets = new ArrayList<>(); - for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) { + for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) { failedOffsets.add(msgId.offset()); } InOrder inOrder = inOrder(consumerMock); @@ -137,93 +133,60 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee inOrder.verify(consumerMock).poll(anyLong()); } } - + @Test - public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples() { - /* - The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded. - numUncommittedOffsets is equal to numNonRetriableEmittedTuples + numRetriableTuples. - The spout will only emit if numUncommittedOffsets - numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < maxUncommittedOffsets) - This means that the latest offset a poll can start at for a retriable partition, - counting from the last committed offset, is maxUncommittedOffsets, - where there are maxUncommittedOffsets - 1 uncommitted tuples "to the left". - If the retry poll starts at that offset, it at most emits the retried tuple plus maxPollRecords - 1 new tuples. - The limit on uncommitted offsets for one partition is therefore maxUncommittedOffsets + maxPollRecords - 1. - - It is only necessary to test this for a single partition, because partitions can't contribute negatively to numNonRetriableEmittedTuples, - so if the limit holds for one partition, it will also hold for each individual partition when multiple are involved. - - This makes the actual limit numPartitions * (maxUncommittedOffsets + maxPollRecords - 1) - */ - - //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying + public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() { + //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit. try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); - - Map>> firstPollRecords = new HashMap<>(); - List> firstPollRecordsForPartition = new ArrayList<>(); - for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { - //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); - } - firstPollRecords.put(partition, firstPollRecordsForPartition); - - int maxPollRecords = 5; - Map>> secondPollRecords = new HashMap<>(); - List> secondPollRecordsForPartition = new ArrayList<>(); - for(int i = 0; i < maxPollRecords; i++) { - secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); - } - secondPollRecords.put(partition, secondPollRecordsForPartition); + TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); + Set partitions = new HashSet<>(); + partitions.add(partition); + partitions.add(partitionTwo); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partitions); + Map>> records = new HashMap<>(); + //This is cheating a bit since maxPollRecords would normally spread this across multiple polls + records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())); + records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets())); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(firstPollRecords)) - .thenReturn(new ConsumerRecords<>(secondPollRecords)); + .thenReturn(new ConsumerRecords<>(records)); - for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) { + for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets()*2; i++) { spout.nextTuple(); } ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); - - KafkaSpoutMessageId failedMessageId = messageIds.getAllValues().get(messageIds.getAllValues().size() - 1); - spout.fail(failedMessageId); - + verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets()*2)).emit(anyString(), anyList(), messageIds.capture()); + + //Now fail a tuple on partition 0 and verify that it is allowed to retry + //Partition 1 should be paused, since it is at the uncommitted offsets limit + Optional failedMessageId = messageIds.getAllValues().stream() + .filter(messageId -> messageId.partition() == partition.partition()) + .findAny(); + + spout.fail(failedMessageId.get()); + reset(collectorMock); - - //Now make the single failed tuple retriable + Time.advanceTime(50); - //The spout should allow another poll since there are now only maxUncommittedOffsets - 1 nonretriable tuples - for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) { - spout.nextTuple(); - } - - ArgumentCaptor retryBatchMessageIdsCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(maxPollRecords)).emit(anyString(), anyList(), retryBatchMessageIdsCaptor.capture()); - reset(collectorMock); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, failedMessageId.get().offset(), 1)))); + + spout.nextTuple(); + + verify(collectorMock, times(1)).emit(anyObject(), anyObject(), anyObject()); - //Check that the consumer started polling at the failed tuple offset InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).seek(partition, failedMessageId.offset()); + inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo)); inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo)); - //Now fail all except one of the last batch, and check that the spout won't reemit any tuples because there are more than maxUncommittedOffsets nonretriable tuples - Time.advanceTime(50); - List retryBatchMessageIds = retryBatchMessageIdsCaptor.getAllValues(); - KafkaSpoutMessageId firstTupleFromRetryBatch = retryBatchMessageIds.remove(0); - for(KafkaSpoutMessageId msgId : retryBatchMessageIds) { - spout.fail(msgId); - } - for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) { - spout.nextTuple(); - } - verify(collectorMock, never()).emit(any(), any(), any()); + reset(collectorMock); - //Fail the last tuple, which brings the number of nonretriable tuples back under the limit, and check that the spout polls again - spout.fail(firstTupleFromRetryBatch); + //Now also check that no more tuples are polled for, since both partitions are at their limits spout.nextTuple(); - verify(collectorMock, times(1)).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + + verify(collectorMock, never()).emit(anyObject(), anyObject(), anyObject()); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 830ac2862f9..d4ae7730650 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -29,8 +29,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -89,19 +91,16 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - List assignedPartitions = new ArrayList<>(); + Set assignedPartitions = new HashSet<>(); assignedPartitions.add(partitionThatWillBeRevoked); assignedPartitions.add(assignedPartition); consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + when(consumerMock.assignment()).thenReturn(assignedPartitions); //Make the consumer return a single message for each partition - Map>> firstPartitionRecords = new HashMap<>(); - firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value"))); - Map>> secondPartitionRecords = new HashMap<>(); - secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(firstPartitionRecords)) - .thenReturn(new ConsumerRecords<>(secondPartitionRecords)) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.createRecords(partitionThatWillBeRevoked, 0, 1)))) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.createRecords(assignedPartition, 0, 1)))) .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); //Emit the messages @@ -116,6 +115,7 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti //Now rebalance consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition)); + when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition)); List emittedMessageIds = new ArrayList<>(); emittedMessageIds.add(messageIdForRevokedPartition.getValue()); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 6df6bc41ee9..1cad6c2542f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -81,22 +81,19 @@ public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { try (SimulatedTime simulatedTime = new SimulatedTime()) { KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); - List> recordsForPartition = new ArrayList<>(); int lastOffset = 3; - for (int i = 0; i <= lastOffset; i++) { - recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); - } - records.put(partition, recordsForPartition); + int numRecords = lastOffset + 1; + records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords)); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(records)); - for (int i = 0; i < recordsForPartition.size(); i++) { + for (int i = 0; i < numRecords; i++) { spout.nextTuple(); } ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); + verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture()); for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 84d8f23102c..501f7334490 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -15,6 +15,7 @@ */ package org.apache.storm.kafka.spout; +import static org.hamcrest.CoreMatchers.either; import static org.hamcrest.CoreMatchers.everyItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -95,7 +96,7 @@ private ArgumentCaptor emitMaxUncommittedOffsetsMessagesAnd ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); for (int i = 0; i < messageCount; i++) { spout.nextTuple(); - }; + } verify(collector, times(maxUncommittedOffsets)).emit( anyObject(), anyObject(), @@ -171,8 +172,13 @@ private void failAllExceptTheFirstMessageThenCommit(ArgumentCaptor messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages); @@ -180,7 +186,7 @@ public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollReco failAllExceptTheFirstMessageThenCommit(messageIds); - //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed + //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed but not retriable //The spout should now emit another maxPollRecords messages //This is allowed because the acked message brings the numUncommittedOffsets below the cap for (int i = 0; i < maxUncommittedOffsets; i++) { @@ -202,18 +208,20 @@ public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollReco .collect(Collectors.toList()); assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false)); - //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted - //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset - //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples as long as numNonRetriableEmittedTuples < maxUncommittedOffsets - - int numNonRetriableEmittedTuples = maxPollRecords; //The other tuples were failed and are becoming retriable - int allowedPolls = (int)Math.ceil((maxUncommittedOffsets - numNonRetriableEmittedTuples)/(double)maxPollRecords); + //Offset 0 is committed, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted + //Fail the last tuples so only offset 0 is not failed. + //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples + //for all the failed tuples that are within maxUncommittedOffsets tuples of the committed offset + //This means 1 to maxUncommitteddOffsets, but not maxUncommittedOffsets+1...maxUncommittedOffsets+maxPollRecords-1 + for(KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) { + spout.fail(msgId); + } Time.advanceTimeSecs(initialRetryDelaySecs); for (int i = 0; i < numMessages; i++) { spout.nextTuple(); } ArgumentCaptor thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(allowedPolls*maxPollRecords)).emit( + verify(collector, times(maxUncommittedOffsets)).emit( anyString(), anyList(), thirdRunMessageIds.capture()); @@ -222,7 +230,7 @@ public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollReco List thirdRunOffsets = thirdRunMessageIds.getAllValues().stream() .map(msgId -> msgId.offset()) .collect(Collectors.toList()); - assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch", thirdRunOffsets, everyItem(isIn(firstRunOffsets))); + assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch, plus the first failed tuple from the second batch", thirdRunOffsets, everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset())))); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 66216dd60bd..eaca82425de 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -50,7 +50,9 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; -import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyList; + +import java.util.regex.Pattern; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -79,12 +81,15 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) - .setOffsetCommitPeriodMs(commitOffsetPeriodMs) - .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), - maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) - .build(); + KafkaSpoutConfig spoutConfig = + SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( + KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), + maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) + .build(); this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.spout = new KafkaSpout<>(spoutConfig, (ignored) -> consumerSpy); } @@ -93,7 +98,7 @@ private void prepareSpout(int messageCount) throws Exception { SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); } - + /* * Asserts that commitSync has been called once, * that there are only commits on one topic, @@ -161,7 +166,7 @@ public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() } @Test - public void shouldContinueWithSlowDoubleAcks() throws Exception { + public void testShouldContinueWithSlowDoubleAcks() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 20; prepareSpout(messageCount); @@ -200,7 +205,7 @@ public void shouldContinueWithSlowDoubleAcks() throws Exception { } @Test - public void shouldEmitAllMessages() throws Exception { + public void testShouldEmitAllMessages() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 10; prepareSpout(messageCount); @@ -228,7 +233,7 @@ public void shouldEmitAllMessages() throws Exception { } @Test - public void shouldReplayInOrderFailedMessages() throws Exception { + public void testShouldReplayInOrderFailedMessages() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 10; prepareSpout(messageCount); @@ -269,7 +274,7 @@ public void shouldReplayInOrderFailedMessages() throws Exception { } @Test - public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { + public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 10; prepareSpout(messageCount); @@ -313,7 +318,7 @@ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { } @Test - public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { + public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { //The spout must reemit retriable tuples, even if they fail out of order. //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries. int messageCount = 10; @@ -347,7 +352,7 @@ public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { } @Test - public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception { + public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception { //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted int messageCount = 1; prepareSpout(messageCount); @@ -367,4 +372,22 @@ public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception { spout.nextTuple(); verify(collector, never()).emit(any(), any(), any()); } + + @Test + public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { + try (SimulatedTime time = new SimulatedTime()) { + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + + //Nothing is assigned yet, should emit nothing + spout.nextTuple(); + verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS); + + //The new partition should be discovered and the message should be emitted + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + } + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index f43616dee99..ad9bd755785 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -51,26 +51,39 @@ public class SpoutWithMockedConsumerSetupHelper { */ public static KafkaSpout setupSpout(KafkaSpoutConfig spoutConfig, Map topoConf, TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer consumerMock, Set assignedPartitions) { + + stubAssignment(contextMock, consumerMock, assignedPartitions); + KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory); + + spout.open(topoConf, contextMock, collectorMock); + spout.activate(); + + verify(consumerMock).assign(assignedPartitions); + return spout; + } + + /** + * Sets up the mocked context and consumer to appear to have the given partition assignment. + * + * @param The Kafka key type + * @param The Kafka value type + * @param contextMock The mocked topology context + * @param consumerMock The mocked consumer + * @param assignedPartitions The partitions to assign to the consumer + */ + public static void stubAssignment(TopologyContext contextMock, KafkaConsumer consumerMock, Set assignedPartitions) { Map> partitionInfos = assignedPartitions.stream() .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null)) .collect(Collectors.groupingBy(info -> info.topic())); partitionInfos.keySet() .forEach(key -> when(consumerMock.partitionsFor(key)) .thenReturn(partitionInfos.get(key))); - KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory); - when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); when(contextMock.getThisTaskIndex()).thenReturn(0); - spout.open(topoConf, contextMock, collectorMock); - spout.activate(); - - verify(consumerMock).assign(assignedPartitions); - - return spout; + when(consumerMock.assignment()).thenReturn(assignedPartitions); } /** diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java index 93a771d7bb4..3670d8acd81 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java @@ -41,7 +41,7 @@ public static KafkaSpoutConfig.Builder createKafkaSpoutConfigBui return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription)); } - private static KafkaSpoutConfig.Builder setCommonSpoutConfig(KafkaSpoutConfig.Builder config) { + public static KafkaSpoutConfig.Builder setCommonSpoutConfig(KafkaSpoutConfig.Builder config) { return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "value"), STREAM) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index abbacf904c5..3acc25297f0 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -170,4 +170,19 @@ private void emitAndAckMessage(KafkaSpoutMessageId msgId) { manager.addToAckMsgs(msgId); } + @Test + public void testGetNthUncommittedOffsetAfterCommittedOffset() { + manager.addToEmitMsgs(initialFetchOffset + 1); + manager.addToEmitMsgs(initialFetchOffset + 2); + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 30); + + assertThat("The third uncommitted offset should be 5", manager.getNthUncommittedOffsetAfterCommittedOffset(3), is(initialFetchOffset + 5L)); + assertThat("The fourth uncommitted offset should be 30", manager.getNthUncommittedOffsetAfterCommittedOffset(4), is(initialFetchOffset + 30L)); + + expect.expect(NoSuchElementException.class); + manager.getNthUncommittedOffsetAfterCommittedOffset(5); + + } + }