Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
Expand All @@ -94,7 +100,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j-over-slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand All @@ -48,6 +45,7 @@
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
Expand All @@ -58,27 +56,27 @@

public class KafkaSpout<K, V> extends BaseRichSpout {
private static final long serialVersionUID = 4151921085047987154L;
//Initial delay for the commit and subscription refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();

// Storm
protected SpoutOutputCollector collector;

// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private final KafkaConsumerFactory kafkaConsumerFactory;
private KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;


// Bookkeeping
private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
private transient Timer commitTimer; // timer == null for auto commit mode
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()

transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode
private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
Expand All @@ -87,13 +85,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {


public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
}

//This constructor is here for testing
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.kafkaSpoutConfig = kafkaSpoutConfig;
}

@Override
Expand All @@ -114,9 +112,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
retryService = kafkaSpoutConfig.getRetryService();

if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually
commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);

acked = new HashMap<>();
emitted = new HashSet<>();
Expand Down Expand Up @@ -204,7 +202,7 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
private void setAcked(TopicPartition tp, long fetchOffset) {
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
acked.put(tp, new OffsetEntry(tp, fetchOffset));
acked.put(tp, new OffsetManager(tp, fetchOffset));
}
}

Expand Down Expand Up @@ -296,7 +294,7 @@ private void doSeekRetriableTopicPartitions() {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1); // Seek to last committed offset
}
}
}
Expand Down Expand Up @@ -353,7 +351,7 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
private void commitOffsetsForAckedTuples() {
// Find offsets that are ready to be committed for every topic partition
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
for (Map.Entry<TopicPartition, OffsetManager> tpOffset : acked.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
Expand All @@ -366,9 +364,14 @@ private void commitOffsetsForAckedTuples() {
LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
// Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
// in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
final OffsetEntry offsetEntry = tpOffset.getValue();
offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
final OffsetManager offsetManager = acked.get(tp);
long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
numUncommittedOffsets -= numCommittedOffsets;
LOG.debug("[{}] uncommitted offsets across all topic partitions",
numUncommittedOffsets);
}
} else {
LOG.trace("No offsets to commit. {}", this);
Expand Down Expand Up @@ -489,127 +492,7 @@ public Map<String, Object> getComponentConfiguration () {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
}

// ======= Offsets Commit Management ==========

private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
}
}

/**
* This class is not thread safe
*/
class OffsetEntry {
private final TopicPartition tp;
private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
* Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1
private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset

public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
this.tp = tp;
this.initialFetchOffset = initialFetchOffset;
this.committedOffset = initialFetchOffset - 1;
LOG.debug("Instantiated {}", this);
}

public void add(KafkaSpoutMessageId msgId) { // O(Log N)
ackedMsgs.add(msgId);
}

/**
* An offset is only committed when all records with lower offset have
* been acked. This guarantees that all offsets smaller than the
* committedOffset have been delivered.
* @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
*/
public OffsetAndMetadata findNextCommitOffset() {
boolean found = false;
long currOffset;
long nextCommitOffset = committedOffset;
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata

for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
break;
} else {
//Received a redundant ack. Ignore and continue processing.
LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
tp, currOffset, committedOffset);
}
}

OffsetAndMetadata nextCommitOffsetAndMetadata = null;
if (found) {
nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
} else {
LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
}
LOG.trace("{}", this);
return nextCommitOffsetAndMetadata;
}

/**
* Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
* calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
*
* @param committedOffset offset to be marked as committed
*/
public void commit(OffsetAndMetadata committedOffset) {
long numCommittedOffsets = 0;
if (committedOffset != null) {
final long oldCommittedOffset = this.committedOffset;
numCommittedOffsets = committedOffset.offset() - this.committedOffset;
this.committedOffset = committedOffset.offset();
for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
if (iterator.next().offset() <= committedOffset.offset()) {
iterator.remove();
} else {
break;
}
}
numUncommittedOffsets-= numCommittedOffsets;
LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets);
} else {
LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
numCommittedOffsets, tp, numUncommittedOffsets);
}
LOG.trace("{}", this);
}

long getCommittedOffset() {
return committedOffset;
}

public boolean isEmpty() {
return ackedMsgs.isEmpty();
}

public boolean contains(ConsumerRecord<K, V> record) {
return contains(new KafkaSpoutMessageId(record));
}

public boolean contains(KafkaSpoutMessageId msgId) {
return ackedMsgs.contains(msgId);
}

@Override
public String toString() {
return "OffsetEntry{" +
"topic-partition=" + tp +
", fetchOffset=" + initialFetchOffset +
", committedOffset=" + committedOffset +
", ackedMsgs=" + ackedMsgs +
'}';
}
}
}
Loading