From d9fe768a9f7cb47bb2b75a26e81676fc69b1fde0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sat, 2 Sep 2017 23:50:33 +0200 Subject: [PATCH] STORM-2546: Fix storm-kafka-client spout getting stuck when retriable offsets were deleted from the Kafka log due to topic compaction --- .../apache/storm/kafka/spout/KafkaSpout.java | 48 +++- .../storm/kafka/spout/KafkaSpoutConfig.java | 21 ++ .../kafka/spout/KafkaSpoutCommitTest.java | 113 --------- .../kafka/spout/KafkaSpoutConfigTest.java | 24 ++ .../storm/kafka/spout/KafkaSpoutEmitTest.java | 9 +- .../KafkaSpoutLogCompactionSupportTest.java | 223 ++++++++++++++++++ .../KafkaSpoutMessagingGuaranteeTest.java | 8 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 3 +- .../SpoutWithMockedConsumerSetupHelper.java | 66 +++++- .../spout/internal/OffsetManagerTest.java | 1 - 10 files changed, 378 insertions(+), 138 deletions(-) delete mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 51d81c7c62e..4cda53cb7e6 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -91,7 +91,7 @@ public class KafkaSpout extends BaseRichSpout { // Always empty if processing guarantee is none or at-most-once private transient Set emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient Iterator> waitingToEmit; + private transient Iterator> waitingToEmit; // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; @@ -152,7 +152,7 @@ public void onPartitionsRevoked(Collection partitions) { previousAssignment = partitions; LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); @@ -271,7 +271,7 @@ private boolean commit() { private Set poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); - + if (waitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted."); return Collections.emptySet(); @@ -317,7 +317,7 @@ private void setWaitingToEmit(ConsumerRecords consumerRecords) { // ======== poll ========= private ConsumerRecords pollKafkaBroker(Set pollablePartitions) { - doSeekRetriableTopicPartitions(pollablePartitions); + final Map retriableOffsets = doSeekRetriableTopicPartitions(pollablePartitions); Set pausedPartitions = new HashSet<>(kafkaConsumer.assignment()); Iterator pausedIter = pausedPartitions.iterator(); while (pausedIter.hasNext()) { @@ -328,6 +328,7 @@ private ConsumerRecords pollKafkaBroker(Set pollablePartit try { kafkaConsumer.pause(pausedPartitions); final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + ackRetriableOffsetsIfCompactedAway(retriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka.", numPolledRecords); @@ -341,13 +342,42 @@ private ConsumerRecords pollKafkaBroker(Set pollablePartit } } - private void doSeekRetriableTopicPartitions(Set pollablePartitions) { + private Map doSeekRetriableTopicPartitions(Set pollablePartitions) { final Map retriableTopicPartitions = retryService.earliestRetriableOffsets(); - + for (TopicPartition tp : retriableTopicPartitions.keySet()) { + if (!pollablePartitions.contains(tp)) { + retriableTopicPartitions.remove(tp); + } + } for (Entry retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) { - if (pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) { - //Seek directly to the earliest retriable message for each retriable topic partition - kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + //Seek directly to the earliest retriable message for each retriable topic partition + kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + } + return retriableTopicPartitions; + } + + private void ackRetriableOffsetsIfCompactedAway(Map earliestRetriableOffsets, + ConsumerRecords consumerRecords) { + for (Entry entry : earliestRetriableOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + List> records = consumerRecords.records(tp); + if (!records.isEmpty()) { + ConsumerRecord record = records.get(0); + long seekOffset = entry.getValue(); + long earliestReceivedOffset = record.offset(); + if (seekOffset < earliestReceivedOffset) { + //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away. + //Ack up to the first offset received if the record is not already acked or currently in the topology + for (long i = seekOffset; i < earliestReceivedOffset; i++) { + KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null)); + if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) { + LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp); + retryService.remove(msgId); + emitted.add(msgId); + ack(msgId); + } + } + } } } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 8ff21b67c2d..58d4753b347 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -728,6 +728,27 @@ private static void setAutoCommitMode(Builder builder) { if (builder.processingGuarantee == ProcessingGuarantee.NONE) { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); } else { + String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { + if (autoOffsetResetPolicy == null) { + /* + If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured + for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range + error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer + requests an offset that was deleted. + */ + builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { + LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." + + " Some messages may be skipped."); + } + } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { + if (autoOffsetResetPolicy != null + && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { + LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." + + " Some messages may be processed more than once."); + } + } builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java deleted file mode 100644 index 95e3b60e554..00000000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Time.SimulatedTime; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InOrder; -import org.mockito.MockitoAnnotations; - -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; - -public class KafkaSpoutCommitTest { - - private final long offsetCommitPeriodMs = 2_000; - private final TopologyContext contextMock = mock(TopologyContext.class); - private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); - private final Map conf = new HashMap<>(); - private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); - private KafkaConsumer consumerMock; - private KafkaSpoutConfig spoutConfig; - - @Captor - private ArgumentCaptor> commitCapture; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - consumerMock = mock(KafkaConsumer.class); - } - - @Test - public void testCommitSuccessWithOffsetVoids() { - //Verify that the commit logic can handle offset voids - try (SimulatedTime simulatedTime = new SimulatedTime()) { - Set assignedPartitions = Collections.singleton(partition); - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, assignedPartitions); - Map>> records = new HashMap<>(); - List> recordsForPartition = new ArrayList<>(); - // Offsets emitted are 0,1,2,3,4,,8,9 - recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); - recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); - records.put(partition, recordsForPartition); - - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(records)); - - for (int i = 0; i < recordsForPartition.size(); i++) { - spout.nextTuple(); - } - - ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); - - for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { - spout.ack(messageId); - } - - // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - Map>> emptyConsumerRecords = Collections.emptyMap(); - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(emptyConsumerRecords)); - spout.nextTuple(); - - InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); - - //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at - Map commits = commitCapture.getValue(); - assertTrue(commits.containsKey(partition)); - assertEquals(10, commits.get(partition).offset()); - } - } - -} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 051d2122c3e..59135e4d8dd 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,8 +17,11 @@ */ package org.apache.storm.kafka.spout; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.util.HashMap; @@ -38,6 +41,7 @@ public void testBasic() { HashMap expected = new HashMap<>(); expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234"); expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); assertEquals(expected, conf.getKafkaProps()); @@ -51,4 +55,24 @@ public void testSetEmitNullTuplesToTrue() { assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples()); } + + @Test + public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() { + KafkaSpoutConfig conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + + assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy", + conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue()); + } + + @Test + public void testWillRespectExplicitAutoOffsetResetPolicy() { + KafkaSpoutConfig conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") + .build(); + + assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee", + (String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none")); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 5d4252e8191..677daeabbbb 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -74,7 +74,7 @@ public void setUp() { public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map>> records = new HashMap<>(); records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10)); @@ -92,7 +92,7 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map>> records = new HashMap<>(); int numRecords = spoutConfig.getMaxUncommittedOffsets(); //This is cheating a bit since maxPollRecords would normally spread this across multiple polls @@ -139,10 +139,7 @@ public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() { //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit. try (SimulatedTime simulatedTime = new SimulatedTime()) { TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); - Set partitions = new HashSet<>(); - partitions.add(partition); - partitions.add(partitionTwo); - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partitions); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); Map>> records = new HashMap<>(); //This is cheating a bit since maxPollRecords would normally spread this across multiple polls records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java new file mode 100644 index 00000000000..855d2534256 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -0,0 +1,223 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.CoreMatchers.is; + +import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; + +public class KafkaSpoutLogCompactionSupportTest { + + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer consumerMock; + private KafkaSpoutConfig spoutConfig; + + @Captor + private ArgumentCaptor> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testCommitSuccessWithOffsetVoids() { + //Verify that the commit logic can handle offset voids due to log compaction + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map>> records = new HashMap<>(); + List> recordsForPartition = new ArrayList<>(); + // Offsets emitted are 0,1,2,3,4,,8,9 + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); + records.put(partition, recordsForPartition); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < recordsForPartition.size(); i++) { + spout.nextTuple(); + } + + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); + + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.ack(messageId); + } + + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords(Collections.>>emptyMap())); + spout.nextTuple(); + + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); + inOrder.verify(consumerMock).poll(anyLong()); + + //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at + Map commits = commitCapture.getValue(); + assertTrue(commits.containsKey(partition)); + assertEquals(10, commits.get(partition).offset()); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() { + /* + Verify that failed offsets will only retry if the corresponding message exists. + When log compaction is enabled in Kafka it is possible that a tuple can fail, + and then be impossible to retry because the message in Kafka has been deleted. + The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset. + */ + try (SimulatedTime simulatedTime = new SimulatedTime()) { + TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); + + List firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + List secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2); + reset(collectorMock); + + for(int i = 0; i < 3; i++) { + spout.fail(firstPartitionMsgIds.get(i)); + spout.fail(secondPartitionMsgIds.get(i)); + } + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away. + //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present. + Map retryOffsets = new HashMap<>(); + retryOffsets.put(partition, new int[] {2}); + retryOffsets.put(partitionTwo, new int[] {0, 1, 2}); + int expectedEmits = 4; //2 on first partition, 0-2 on second partition + List retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L)); + + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed, hasKey(partition)); + assertThat(committed, hasKey(partitionTwo)); + assertThat(committed.get(partition).offset(), is(3L)); + assertThat(committed.get(partitionTwo).offset(), is(3L)); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() { + //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally. + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + List firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + + spout.fail(firstPartitionMsgIds.get(0)); + spout.fail(firstPartitionMsgIds.get(2)); + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away. + List retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2); + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending", + committed.get(partition).offset(), is(1L)); + + spout.ack(firstPartitionMsgIds.get(1)); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L)); + } + } + +} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 8613ce77dd2..95bc7288190 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -67,7 +67,7 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .build(); - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -82,7 +82,7 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { } private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig spoutConfig) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, @@ -117,7 +117,7 @@ public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception { } private void doTestModeCannotReplayTuples(KafkaSpoutConfig spoutConfig) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -163,7 +163,7 @@ public void testAnyTimesModeCannotReplayTuples() throws Exception { private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig spoutConfig) { try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 24b137338b4..58cfd51619a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -79,7 +78,7 @@ public void setUp() { public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map>> records = new HashMap<>(); int lastOffset = 3; int numRecords = lastOffset + 1; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index 6f6196a0eab..e808335f841 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -17,22 +17,30 @@ package org.apache.storm.kafka.spout; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.mockito.ArgumentCaptor; public class SpoutWithMockedConsumerSetupHelper { @@ -50,9 +58,10 @@ public class SpoutWithMockedConsumerSetupHelper { * @return The spout */ public static KafkaSpout setupSpout(KafkaSpoutConfig spoutConfig, Map topoConf, - TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer consumerMock, Set assignedPartitions) { + TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer consumerMock, TopicPartition... assignedPartitions) { + Set assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions)); - stubAssignment(contextMock, consumerMock, assignedPartitions); + stubAssignment(contextMock, consumerMock, assignedPartitionsSet); KafkaConsumerFactory consumerFactory = new KafkaConsumerFactory() { @Override public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { @@ -64,7 +73,7 @@ public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfi spout.open(topoConf, contextMock, collectorMock); spout.activate(); - verify(consumerMock).assign(assignedPartitions); + verify(consumerMock).assign(assignedPartitionsSet); return spout; } @@ -116,4 +125,55 @@ public static List> createRecords(TopicPartition top return recordsForPartition; } + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * @param The Kafka key type + * @param The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param expectedEmits The number of expected emits + * @param collectorMock The collector used by the spout + * @param partition The partition to emit messages on + * @param offsetsToEmit The offsets to emit + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static List pollAndEmit(KafkaSpout spout, KafkaConsumer consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, TopicPartition partition, int... offsetsToEmit) { + return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, Collections.singletonMap(partition, offsetsToEmit)); + } + + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * @param The Kafka key type + * @param The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param collectorMock The collector used by the spout + * @param offsetsToEmit The offsets to emit per partition + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static List pollAndEmit(KafkaSpout spout, KafkaConsumer consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, Map offsetsToEmit) { + int totalOffsets = 0; + Map>> records = new HashMap<>(); + for (Entry entry : offsetsToEmit.entrySet()) { + TopicPartition tp = entry.getKey(); + List> tpRecords = new ArrayList<>(); + for (Integer offset : entry.getValue()) { + tpRecords.add(new ConsumerRecord(tp.topic(), tp.partition(), offset, null, null)); + totalOffsets++; + } + records.put(tp, tpRecords); + } + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < totalOffsets; i++) { + spout.nextTuple(); + } + + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(expectedEmits)).emit(anyString(), anyList(), messageIds.capture()); + return messageIds.getAllValues(); + } + } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index 3acc25297f0..46b3c9be7f6 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertThat; import java.util.NoSuchElementException; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId;