diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index e37d5494158..55a11c3796b 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.spout.SpoutOutputCollector; @@ -59,6 +60,8 @@ public class KafkaSpout extends BaseRichSpout { // Storm protected SpoutOutputCollector collector; + protected int thisTaskIndex; + protected int taskCount; // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; @@ -66,11 +69,15 @@ public class KafkaSpout extends BaseRichSpout { private transient boolean consumerAutoCommitMode; + // Bookkeeping private transient int maxRetries; // Max number of times a tuple is retried private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure private transient Timer commitTimer; // timer == null for auto commit mode + private transient Timer partitionRefreshTimer; // partitionRefreshTime != null if in manual partition assign model + private transient boolean manualPartitionAssignment; + private transient KafkaSpoutConsumerRebalanceListener partitionRebalanceListener; private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() @@ -94,12 +101,16 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect // Spout internals this.collector = collector; + thisTaskIndex = context.getThisTaskIndex(); + taskCount = context.getComponentTasks(context.getThisComponentId()).size(); maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); numUncommittedOffsets = 0; // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + manualPartitionAssignment = kafkaSpoutConfig.isManualPartitionAssign(); + partitionRebalanceListener = new KafkaSpoutConsumerRebalanceListener(); // Retries management retryService = kafkaSpoutConfig.getRetryService(); @@ -111,6 +122,10 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } + if (manualPartitionAssignment) { + partitionRefreshTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); + } + acked = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = Collections.emptyListIterator(); @@ -200,16 +215,22 @@ private void setAcked(TopicPartition tp, long fetchOffset) { @Override public void nextTuple() { if (initialized) { - if (commit()) { - commitOffsetsForAckedTuples(); - } + try { + refreshPartitionIfNeeded(); - if (poll()) { - setWaitingToEmit(pollKafkaBroker()); - } + if (commit()) { + commitOffsetsForAckedTuples(); + } - if (waitingToEmit()) { - emit(); + if (poll()) { + setWaitingToEmit(pollKafkaBroker()); + } + + if (waitingToEmit()) { + emit(); + } + } catch (Exception e) { + LOG.error("Failed to emit tuples.", e); } } else { LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); @@ -236,6 +257,35 @@ private boolean poll() { return poll; } + private void refreshPartitionIfNeeded() { + if (!manualPartitionAssignment || !partitionRefreshTimer.isExpiredResetOnTrue()) return; + doRefreshPartitions(); + } + + private void doRefreshPartitions() { + KafkaSpoutStreamsNamedTopics streams = KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams); + List partitions = KafkaUtils.readPartitions(kafkaConsumer, streams.getTopics()); + List tps = new ArrayList<>(partitions.size()); + for (PartitionInfo info: partitions) { + tps.add(new TopicPartition(info.topic(), info.partition())); + } + + Collections.sort(tps, TopicPartitionComparator.INSTANCE); + + Set myPartitions = new HashSet<>(tps.size()/taskCount + 1); + for (int i = thisTaskIndex; i < tps.size(); i += taskCount) { + myPartitions.add(tps.get(i)); + } + + Set originalPartitions = kafkaConsumer.assignment(); + + if (!originalPartitions.equals(myPartitions)) { + partitionRebalanceListener.onPartitionsRevoked(originalPartitions); + partitionRebalanceListener.onPartitionsAssigned(myPartitions); + kafkaConsumer.assign(myPartitions); + } + } + private boolean waitingToEmit() { return waitingToEmit != null && waitingToEmit.hasNext(); } @@ -262,11 +312,12 @@ private void doSeekRetriableTopicPartitions() { final Set retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { - final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); - if (offsetAndMeta != null) { - kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle - } else { - kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset + KafkaSpout.OffsetEntry entry = acked.get(rtp); + if (entry != null) { + final OffsetAndMetadata offsetAndMeta = entry.findNextCommitOffset(); + if (offsetAndMeta != null) { + kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle + } } } } @@ -360,18 +411,22 @@ private void subscribeKafkaConsumer() { kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); - if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { - final List topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); - kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener()); - LOG.info("Kafka consumer subscribed topics {}", topics); - } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { - final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern(); - kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener()); - LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); + if (manualPartitionAssignment) { + doRefreshPartitions(); + } else { + if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { + final List topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); + kafkaConsumer.subscribe(topics, partitionRebalanceListener); + LOG.info("Kafka consumer subscribed topics {}", topics); + } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { + final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern(); + kafkaConsumer.subscribe(pattern, partitionRebalanceListener); + LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); + } + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + kafkaConsumer.poll(0); } - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - kafkaConsumer.poll(0); } @Override diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 7c97ac9b04c..015a9416613 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -36,6 +36,7 @@ public class KafkaSpoutConfig implements Serializable { public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case + public static final int DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s // Kafka property names public interface Consumer { @@ -73,9 +74,11 @@ public enum FirstPollOffsetStrategy { private final long pollTimeoutMs; // Kafka spout configuration + private final boolean manualPartitionAssign; private final long offsetCommitPeriodMs; private final int maxRetries; private final int maxUncommittedOffsets; + private final int partitionRefreshPeriodMs; private final FirstPollOffsetStrategy firstPollOffsetStrategy; private final KafkaSpoutStreams kafkaSpoutStreams; private final KafkaSpoutTuplesBuilder tuplesBuilder; @@ -93,6 +96,8 @@ private KafkaSpoutConfig(Builder builder) { this.maxUncommittedOffsets = builder.maxUncommittedOffsets; this.tuplesBuilder = builder.tuplesBuilder; this.retryService = builder.retryService; + this.manualPartitionAssign = builder.manualPartitionAssign; + this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; } private Map setDefaultsAndGetKafkaProps(Map kafkaProps) { @@ -113,6 +118,8 @@ public static class Builder { private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; + private boolean manualPartitionAssign = false; + private int partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private final KafkaSpoutTuplesBuilder tuplesBuilder; private final KafkaSpoutRetryService retryService; @@ -230,8 +237,37 @@ public Builder setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPol } public KafkaSpoutConfig build() { + validate(); return new KafkaSpoutConfig<>(this); } + + /** + * Defines whether the consumer manages partition manually. + * If set to true, the consumer behaves like a simple consumer, otherwise it will rely on kafka to do partition assignment. + * @param manualPartitionAssign Whether use manual partition assignment. + */ + public Builder setManualPartitionAssign(boolean manualPartitionAssign) { + this.manualPartitionAssign = manualPartitionAssign; + return this; + } + + /** + * Defines partition refresh period in the manual partition assign model. + * @param partitionRefreshPeriodMs Partition refresh period in ms. + */ + public Builder setPartitionRefreshPeriodMs(int partitionRefreshPeriodMs) { + this.partitionRefreshPeriodMs = partitionRefreshPeriodMs; + return this; + } + + /** + * Validate configs before build. + */ + private void validate() { + if (this.manualPartitionAssign && kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { + throw new IllegalArgumentException("Manual partition assign can't be used with wildcard topics!"); + } + } } public Map getKafkaProps() { @@ -283,6 +319,10 @@ public Pattern getTopicWildcardPattern() { null; } + public boolean isManualPartitionAssign() { + return manualPartitionAssign; + } + public int getMaxTupleRetries() { return maxRetries; } @@ -307,6 +347,10 @@ public KafkaSpoutRetryService getRetryService() { return retryService; } + public int getPartitionRefreshPeriodMs() { + return partitionRefreshPeriodMs; + } + @Override public String toString() { return "KafkaSpoutConfig{" + diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaUtils.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaUtils.java new file mode 100644 index 00000000000..b0c92cbf975 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaUtils.java @@ -0,0 +1,23 @@ +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Created by liurenjie on 12/7/16. + */ +public final class KafkaUtils { + public static List readPartitions(KafkaConsumer consumer, Iterable topics) { + List partitionInfos = new ArrayList<>(); + for (String topic : topics) { + partitionInfos.addAll(consumer.partitionsFor(topic)); + } + + return partitionInfos; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java new file mode 100644 index 00000000000..bad3cf48b93 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java @@ -0,0 +1,21 @@ +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Comparator; + +/** + * Created by liurenjie on 12/7/16. + */ +public enum TopicPartitionComparator implements Comparator { + INSTANCE; + + @Override + public int compare(TopicPartition o1, TopicPartition o2) { + if (!o1.topic().equals(o2.topic())) { + return o1.topic().compareTo(o2.topic()); + } else { + return o1.partition() - o2.partition(); + } + } +}