Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Always empty if processing guarantee is none or at-most-once
private transient Set<KafkaSpoutMessageId> emitted;
// Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
// Triggers when a subscription should be refreshed
private transient Timer refreshSubscriptionTimer;
private transient TopologyContext context;
Expand Down Expand Up @@ -152,7 +152,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
previousAssignment = partitions;

LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);

if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
Expand Down Expand Up @@ -271,7 +271,7 @@ private boolean commit() {

private Set<TopicPartition> poll() {
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();

if (waitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted.");
return Collections.emptySet();
Expand Down Expand Up @@ -317,7 +317,7 @@ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {

// ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> pollablePartitions) {
doSeekRetriableTopicPartitions(pollablePartitions);
final Map<TopicPartition, Long> retriableOffsets = doSeekRetriableTopicPartitions(pollablePartitions);
Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
Iterator<TopicPartition> pausedIter = pausedPartitions.iterator();
while (pausedIter.hasNext()) {
Expand All @@ -328,6 +328,7 @@ private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> pollablePartit
try {
kafkaConsumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(retriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka.",
numPolledRecords);
Expand All @@ -341,13 +342,42 @@ private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> pollablePartit
}
}

private void doSeekRetriableTopicPartitions(Set<TopicPartition> pollablePartitions) {
private Map<TopicPartition, Long> doSeekRetriableTopicPartitions(Set<TopicPartition> pollablePartitions) {
final Map<TopicPartition, Long> retriableTopicPartitions = retryService.earliestRetriableOffsets();

for (TopicPartition tp : retriableTopicPartitions.keySet()) {
if (!pollablePartitions.contains(tp)) {
retriableTopicPartitions.remove(tp);
}
}
for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) {
if (pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) {
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
return retriableTopicPartitions;
}

private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
ConsumerRecords<K, V> consumerRecords) {
for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
if (!records.isEmpty()) {
ConsumerRecord<K, V> record = records.get(0);
long seekOffset = entry.getValue();
long earliestReceivedOffset = record.offset();
if (seekOffset < earliestReceivedOffset) {
//Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
//Ack up to the first offset received if the record is not already acked or currently in the topology
for (long i = seekOffset; i < earliestReceivedOffset; i++) {
KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null));
if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp);
retryService.remove(msgId);
emitted.add(msgId);
ack(msgId);
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,27 @@ private static void setAutoCommitMode(Builder<?, ?> builder) {
if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
} else {
String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
if (autoOffsetResetPolicy == null) {
/*
If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
requests an offset that was deleted.
*/
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
} else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+ " Some messages may be skipped.");
}
} else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
if (autoOffsetResetPolicy != null
&& (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+ " Some messages may be processed more than once.");
}
}
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
*/
package org.apache.storm.kafka.spout;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
Expand All @@ -38,6 +41,7 @@ public void testBasic() {
HashMap<String, Object> expected = new HashMap<>();
expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234");
expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
assertEquals(expected, conf.getKafkaProps());
Expand All @@ -51,4 +55,24 @@ public void testSetEmitNullTuplesToTrue() {

assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples());
}

@Test
public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.build();

assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy",
conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue());
}

@Test
public void testWillRespectExplicitAutoOffsetResetPolicy() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
.build();

assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee",
(String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setUp() {
public void testNextTupleEmitsAtMostOneTuple() {
//The spout should emit at most one message per call to nextTuple
//This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 10));

Expand All @@ -92,7 +92,7 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee

//Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
try (SimulatedTime simulatedTime = new SimulatedTime()) {
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
int numRecords = spoutConfig.getMaxUncommittedOffsets();
//This is cheating a bit since maxPollRecords would normally spread this across multiple polls
Expand Down Expand Up @@ -139,10 +139,7 @@ public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
//This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit.
try (SimulatedTime simulatedTime = new SimulatedTime()) {
TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(partition);
partitions.add(partitionTwo);
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partitions);
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
//This is cheating a bit since maxPollRecords would normally spread this across multiple polls
records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()));
Expand Down
Loading