Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Always empty if processing guarantee is none or at-most-once
private transient Set<KafkaSpoutMessageId> emitted;
// Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
// Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
private transient long numUncommittedOffsets;
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
// Triggers when a subscription should be refreshed
private transient Timer refreshSubscriptionTimer;
private transient TopologyContext context;
Expand All @@ -115,7 +113,6 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC

// Spout internals
this.collector = collector;
numUncommittedOffsets = 0;

// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
Expand Down Expand Up @@ -227,13 +224,19 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
public void nextTuple() {
try {
if (initialized) {

if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaSpoutConfig.getSubscription().refreshAssignment();
}

if (commit()) {
commitOffsetsForAckedTuples();
}

if (poll()) {
Set<TopicPartition> pollablePartitions = poll();
if (!pollablePartitions.isEmpty()) {
try {
setWaitingToEmit(pollKafkaBroker());
setWaitingToEmit(pollKafkaBroker(pollablePartitions));
} catch (RetriableException e) {
LOG.error("Failed to poll from kafka.", e);
}
Expand All @@ -260,27 +263,38 @@ private boolean commit() {
return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
}

private boolean poll() {
private Set<TopicPartition> poll() {
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
final int readyMessageCount = retryService.readyMessageCount();
final boolean poll = !waitingToEmit()
// Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit.
// Accounting for retriable tuples in this way still guarantees that the limit is followed on a per partition basis,
// and prevents locking up the spout when there are too many retriable tuples
&& (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || !isAtLeastOnceProcessing());

if (!poll) {
if (waitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted."
+ " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
}

if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnceProcessing()) {
LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]",
numUncommittedOffsets, maxUncommittedOffsets);

if (waitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted.");
return Collections.emptySet();
}
Set<TopicPartition> assignment = kafkaConsumer.assignment();
if (!isAtLeastOnceProcessing()) {
return assignment;
}
Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
Set<TopicPartition> pollablePartitions = new HashSet<>();
for (TopicPartition tp : assignment) {
OffsetManager offsetManager = offsetManagers.get(tp);
int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
if (numUncommittedOffsets < maxUncommittedOffsets) {
//Allow poll if the partition is not at the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
//Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
numUncommittedOffsets, maxUncommittedOffsets);
}
}
}
return poll;
return pollablePartitions;
}

private boolean waitingToEmit() {
Expand All @@ -296,28 +310,34 @@ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
}

// ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker() {
doSeekRetriableTopicPartitions();
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaSpoutConfig.getSubscription().refreshAssignment();
}
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions",
numPolledRecords, numUncommittedOffsets);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
kafkaConsumer.commitSync();
private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> pollablePartitions) {
doSeekRetriableTopicPartitions(pollablePartitions);
Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
pausedPartitions.removeIf(pollablePartitions::contains);
try {
kafkaConsumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka.",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
kafkaConsumer.commitSync();
}
return consumerRecords;
} finally {
kafkaConsumer.resume(pausedPartitions);
}
return consumerRecords;
}

private void doSeekRetriableTopicPartitions() {
private void doSeekRetriableTopicPartitions(Set<TopicPartition> pollablePartitions) {
final Map<TopicPartition, Long> retriableTopicPartitions = retryService.earliestRetriableOffsets();

for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) {
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
if (pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) {
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}
}

Expand Down Expand Up @@ -367,8 +387,6 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
} else { //New tuple, hence increment the uncommitted offset counter
numUncommittedOffsets++;
}
collector.emit(stream, tuple, msgId);
tupleListener.onEmit(tuple, msgId);
Expand Down Expand Up @@ -429,10 +447,8 @@ private void commitOffsetsForAckedTuples() {


final OffsetManager offsetManager = offsetManagers.get(tp);
long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
numUncommittedOffsets -= numCommittedOffsets;
LOG.debug("[{}] uncommitted offsets across all topic partitions",
numUncommittedOffsets);
offsetManager.commit(tpOffset.getValue());
LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
}
} else {
LOG.trace("No offsets to commit. {}", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,12 @@ public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
}

/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this
* limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets
* below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded,
* but no partition will exceed this limit by more than maxPollRecords - 1.
*
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
* of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
* This limit is per partition and may in some cases be exceeded,
* but each partition cannot exceed this limit by more than maxPollRecords - 1.
*
* <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
*
* @param maxUncommittedOffsets max number of records that can be be pending commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -65,6 +66,27 @@ public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N)
public void addToEmitMsgs(long offset) {
this.emittedOffsets.add(offset); // O(Log N)
}

public int getNumUncommittedOffsets() {
return this.emittedOffsets.size();
}

/**
* Gets the offset of the nth emitted message after the committed offset.
* Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have been emitted,
* getNthUncommittedOffsetAfterCommittedOffset(3) returns 8.
*
* @param index The index of the message to get the offset for
* @return The offset
* @throws NoSuchElementException if the index is out of range
*/
public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
Iterator<Long> offsetIter = emittedOffsets.iterator();
for (int i = 0; i < index - 1; i++) {
offsetIter.next();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about calling toArray() on the set and then fetching the Nth offset directly using the array index? Trade-off speed for some memory/garbage. We can possibly even pre-allocate an array of size maxUncommitted + batch size for this purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is copying the entire set to an array faster than iterating the set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess we ll need to check the implementation to confirm how the array is created from the data structures behind the Set interface. Agree that it is possible for the iteration to be as fast or faster than the array creation. There may not be a clear cut winner here and it might depend on 'N'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like an iterator is used under the covers to create the array (which explains the ordering guarantee in the array). No change required.. :-)

return offsetIter.next();
}

/**
* An offset can only be committed when all emitted records with lower offset have been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -75,16 +76,13 @@ public void setUp() {
public void testCommitSuccessWithOffsetVoids() {
//Verify that the commit logic can handle offset voids
try (SimulatedTime simulatedTime = new SimulatedTime()) {
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Set<TopicPartition> assignedPartitions = Collections.singleton(partition);
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, assignedPartitions);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
// Offsets emitted are 0,1,2,3,4,<void>,8,9
for (int i = 0; i < 5; i++) {
recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
for (int i = 8; i < 10; i++) {
recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5));
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2));
records.put(partition, recordsForPartition);

when(consumerMock.poll(anyLong()))
Expand Down
Loading