diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 3364fb080f7..425681de6f8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -91,7 +91,7 @@ public class KafkaSpout extends BaseRichSpout { // Always empty if processing guarantee is none or at-most-once private transient Set emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient Iterator> waitingToEmit; + private transient Iterator> waitingToEmit; // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; @@ -152,7 +152,7 @@ public void onPartitionsRevoked(Collection partitions) { previousAssignment = partitions; LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); @@ -265,7 +265,7 @@ private boolean commit() { private Set poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); - + if (waitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted."); return Collections.emptySet(); @@ -311,12 +311,13 @@ private void setWaitingToEmit(ConsumerRecords consumerRecords) { // ======== poll ========= private ConsumerRecords pollKafkaBroker(Set pollablePartitions) { - doSeekRetriableTopicPartitions(pollablePartitions); + final Map retriableOffsets = doSeekRetriableTopicPartitions(pollablePartitions); Set pausedPartitions = new HashSet<>(kafkaConsumer.assignment()); pausedPartitions.removeIf(pollablePartitions::contains); try { kafkaConsumer.pause(pausedPartitions); final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + ackRetriableOffsetsIfCompactedAway(retriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka.", numPolledRecords); @@ -330,13 +331,42 @@ private ConsumerRecords pollKafkaBroker(Set pollablePartit } } - private void doSeekRetriableTopicPartitions(Set pollablePartitions) { + private Map doSeekRetriableTopicPartitions(Set pollablePartitions) { final Map retriableTopicPartitions = retryService.earliestRetriableOffsets(); - + for (TopicPartition tp : retriableTopicPartitions.keySet()) { + if (!pollablePartitions.contains(tp)) { + retriableTopicPartitions.remove(tp); + } + } for (Entry retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) { - if (pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) { - //Seek directly to the earliest retriable message for each retriable topic partition - kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + //Seek directly to the earliest retriable message for each retriable topic partition + kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + } + return retriableTopicPartitions; + } + + private void ackRetriableOffsetsIfCompactedAway(Map earliestRetriableOffsets, + ConsumerRecords consumerRecords) { + for (Entry entry : earliestRetriableOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + List> records = consumerRecords.records(tp); + if (!records.isEmpty()) { + ConsumerRecord record = records.get(0); + long seekOffset = entry.getValue(); + long earliestReceivedOffset = record.offset(); + if (seekOffset < earliestReceivedOffset) { + //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away. + //Ack up to the first offset received if the record is not already acked or currently in the topology + for (long i = seekOffset; i < earliestReceivedOffset; i++) { + KafkaSpoutMessageId msgId = retryService.getMessageId(tp, i); + if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) { + LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp); + retryService.remove(msgId); + emitted.add(msgId); + ack(msgId); + } + } + } } } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index d5fceb46745..ca8dcee5b89 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -38,6 +39,8 @@ import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner; import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics. @@ -65,6 +68,7 @@ public class KafkaSpoutConfig implements Serializable { public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE; public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); + public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class); // Kafka consumer configuration private final Map kafkaProps; @@ -439,6 +443,27 @@ private static void setAutoCommitMode(Builder builder) { if (builder.processingGuarantee == ProcessingGuarantee.NONE) { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); } else { + String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { + if (autoOffsetResetPolicy == null) { + /* + If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured + for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range + error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer + requests an offset that was deleted. + */ + builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { + LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." + + " Some messages may be skipped."); + } + } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { + if (autoOffsetResetPolicy != null + && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { + LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." + + " Some messages may be processed more than once."); + } + } builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java deleted file mode 100644 index 441e649aecc..00000000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Time.SimulatedTime; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InOrder; -import org.mockito.MockitoAnnotations; - -import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.mockito.ArgumentMatchers.anyList; - -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class KafkaSpoutCommitTest { - - private final long offsetCommitPeriodMs = 2_000; - private final TopologyContext contextMock = mock(TopologyContext.class); - private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); - private final Map conf = new HashMap<>(); - private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); - private KafkaConsumer consumerMock; - private KafkaSpoutConfig spoutConfig; - - @Captor - private ArgumentCaptor> commitCapture; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - consumerMock = mock(KafkaConsumer.class); - } - - @Test - public void testCommitSuccessWithOffsetVoids() { - //Verify that the commit logic can handle offset voids - try (SimulatedTime simulatedTime = new SimulatedTime()) { - Set assignedPartitions = Collections.singleton(partition); - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, assignedPartitions); - Map>> records = new HashMap<>(); - List> recordsForPartition = new ArrayList<>(); - // Offsets emitted are 0,1,2,3,4,,8,9 - recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); - recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); - records.put(partition, recordsForPartition); - - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(records)); - - for (int i = 0; i < recordsForPartition.size(); i++) { - spout.nextTuple(); - } - - ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); - - for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { - spout.ack(messageId); - } - - // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); - spout.nextTuple(); - - InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); - - //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at - Map commits = commitCapture.getValue(); - assertTrue(commits.containsKey(partition)); - assertEquals(10, commits.get(partition).offset()); - } - } - -} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 051d2122c3e..e42719b0e00 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,8 +17,11 @@ */ package org.apache.storm.kafka.spout; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.util.HashMap; @@ -38,6 +41,7 @@ public void testBasic() { HashMap expected = new HashMap<>(); expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234"); expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); assertEquals(expected, conf.getKafkaProps()); @@ -51,4 +55,24 @@ public void testSetEmitNullTuplesToTrue() { assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples()); } + + @Test + public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() { + KafkaSpoutConfig conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + + assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy", + conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue()); + } + + @Test + public void testWillRespectExplicitAutoOffsetResetPolicy() { + KafkaSpoutConfig conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") + .build(); + + assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee", + conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none")); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 7cfd6b78c2e..91831e4fe39 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -74,7 +74,7 @@ public void setUp() { public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map>> records = new HashMap<>(); records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10)); @@ -92,7 +92,7 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map>> records = new HashMap<>(); int numRecords = spoutConfig.getMaxUncommittedOffsets(); //This is cheating a bit since maxPollRecords would normally spread this across multiple polls @@ -139,10 +139,7 @@ public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() { //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit. try (SimulatedTime simulatedTime = new SimulatedTime()) { TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); - Set partitions = new HashSet<>(); - partitions.add(partition); - partitions.add(partitionTwo); - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partitions); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); Map>> records = new HashMap<>(); //This is cheating a bit since maxPollRecords would normally spread this across multiple polls records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java new file mode 100644 index 00000000000..b824d07f4a1 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -0,0 +1,221 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.hamcrest.CoreMatchers.is; + +import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class KafkaSpoutLogCompactionSupportTest { + + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer consumerMock; + private KafkaSpoutConfig spoutConfig; + + @Captor + private ArgumentCaptor> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testCommitSuccessWithOffsetVoids() { + //Verify that the commit logic can handle offset voids due to log compaction + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map>> records = new HashMap<>(); + List> recordsForPartition = new ArrayList<>(); + // Offsets emitted are 0,1,2,3,4,,8,9 + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); + records.put(partition, recordsForPartition); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < recordsForPartition.size(); i++) { + spout.nextTuple(); + } + + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); + + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.ack(messageId); + } + + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + spout.nextTuple(); + + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); + inOrder.verify(consumerMock).poll(anyLong()); + + //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at + Map commits = commitCapture.getValue(); + assertTrue(commits.containsKey(partition)); + assertEquals(10, commits.get(partition).offset()); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() { + /* + Verify that failed offsets will only retry if the corresponding message exists. + When log compaction is enabled in Kafka it is possible that a tuple can fail, + and then be impossible to retry because the message in Kafka has been deleted. + The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset. + */ + try (SimulatedTime simulatedTime = new SimulatedTime()) { + TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); + + List firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + List secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2); + reset(collectorMock); + + for(int i = 0; i < 3; i++) { + spout.fail(firstPartitionMsgIds.get(i)); + spout.fail(secondPartitionMsgIds.get(i)); + } + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away. + //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present. + Map retryOffsets = new HashMap<>(); + retryOffsets.put(partition, new int[] {2}); + retryOffsets.put(partitionTwo, new int[] {0, 1, 2}); + int expectedEmits = 4; //2 on first partition, 0-2 on second partition + List retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L)); + + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed, hasKey(partition)); + assertThat(committed, hasKey(partitionTwo)); + assertThat(committed.get(partition).offset(), is(3L)); + assertThat(committed.get(partitionTwo).offset(), is(3L)); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() { + //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally. + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + List firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + + spout.fail(firstPartitionMsgIds.get(0)); + spout.fail(firstPartitionMsgIds.get(2)); + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away. + List retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2); + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending", + committed.get(partition).offset(), is(1L)); + + spout.ack(firstPartitionMsgIds.get(1)); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L)); + } + } + +} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 07ee2dceb06..0948957d5ac 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -67,7 +67,7 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .build(); - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -82,7 +82,7 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { } private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig spoutConfig) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, @@ -117,7 +117,7 @@ public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception { } private void doTestModeCannotReplayTuples(KafkaSpoutConfig spoutConfig) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -163,7 +163,7 @@ public void testAnyTimesModeCannotReplayTuples() throws Exception { private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig spoutConfig) { try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 1cad6c2542f..2f06db388ad 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -79,7 +78,7 @@ public void setUp() { public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map>> records = new HashMap<>(); int lastOffset = 3; int numRecords = lastOffset + 1; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index ad9bd755785..407ab84185b 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -17,22 +17,32 @@ package org.apache.storm.kafka.spout; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.mockito.ArgumentCaptor; public class SpoutWithMockedConsumerSetupHelper { @@ -50,16 +60,17 @@ public class SpoutWithMockedConsumerSetupHelper { * @return The spout */ public static KafkaSpout setupSpout(KafkaSpoutConfig spoutConfig, Map topoConf, - TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer consumerMock, Set assignedPartitions) { + TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer consumerMock, TopicPartition... assignedPartitions) { + Set assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions)); - stubAssignment(contextMock, consumerMock, assignedPartitions); + stubAssignment(contextMock, consumerMock, assignedPartitionsSet); KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory); spout.open(topoConf, contextMock, collectorMock); spout.activate(); - verify(consumerMock).assign(assignedPartitions); + verify(consumerMock).assign(assignedPartitionsSet); return spout; } @@ -104,4 +115,55 @@ public static List> createRecords(TopicPartition top return recordsForPartition; } + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * @param The Kafka key type + * @param The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param expectedEmits The number of expected emits + * @param collectorMock The collector used by the spout + * @param partition The partition to emit messages on + * @param offsetsToEmit The offsets to emit + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static List pollAndEmit(KafkaSpout spout, KafkaConsumer consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, TopicPartition partition, int... offsetsToEmit) { + return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, Collections.singletonMap(partition, offsetsToEmit)); + } + + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * @param The Kafka key type + * @param The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param collectorMock The collector used by the spout + * @param offsetsToEmit The offsets to emit per partition + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static List pollAndEmit(KafkaSpout spout, KafkaConsumer consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, Map offsetsToEmit) { + int totalOffsets = 0; + Map>> records = new HashMap<>(); + for (Entry entry : offsetsToEmit.entrySet()) { + TopicPartition tp = entry.getKey(); + List> tpRecords = new ArrayList<>(); + for (Integer offset : entry.getValue()) { + tpRecords.add(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, null, null)); + totalOffsets++; + } + records.put(tp, tpRecords); + } + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < totalOffsets; i++) { + spout.nextTuple(); + } + + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(expectedEmits)).emit(anyString(), anyList(), messageIds.capture()); + return messageIds.getAllValues(); + } + } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index 3acc25297f0..46b3c9be7f6 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertThat; import java.util.NoSuchElementException; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId;