diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index d405c4da874..dcb245cc71d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -24,7 +24,14 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetcher; +import org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -46,16 +53,11 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; - -import org.apache.kafka.common.errors.InterruptException; public class KafkaSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); @@ -63,6 +65,7 @@ public class KafkaSpout extends BaseRichSpout { // Storm protected SpoutOutputCollector collector; + private TopologyContext topologyContext; // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; @@ -77,6 +80,7 @@ public class KafkaSpout extends BaseRichSpout { private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure private transient Timer commitTimer; // timer == null for auto commit mode private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + private transient KafkaRecordsFetcher recordsFetcher; // Class that encapsulates the logic of managing partitions and fetching records // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples @@ -102,9 +106,9 @@ public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { initialized = false; - // Spout internals this.collector = collector; + this.topologyContext = context; maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); numUncommittedOffsets = 0; @@ -221,7 +225,11 @@ public void nextTuple() { } if (poll()) { - setWaitingToEmit(pollKafkaBroker()); + try { + setWaitingToEmit(pollKafkaBroker()); + } catch (RetriableException e) { + LOG.error("Failed to poll from kafka.", e); + } } if (waitingToEmit()) { @@ -277,7 +285,7 @@ public void setWaitingToEmit(ConsumerRecords consumerRecords) { private ConsumerRecords pollKafkaBroker() { doSeekRetriableTopicPartitions(); - final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final ConsumerRecords consumerRecords = recordsFetcher.fetchRecords(kafkaSpoutConfig.getPollTimeoutMs()); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets); return consumerRecords; @@ -404,19 +412,8 @@ public void activate() { private void subscribeKafkaConsumer() { kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); - - if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { - final List topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); - kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener()); - LOG.info("Kafka consumer subscribed topics {}", topics); - } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { - final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern(); - kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener()); - LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); - } - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - kafkaConsumer.poll(0); + recordsFetcher = KafkaRecordsFetchers.create(kafkaSpoutConfig, kafkaConsumer, topologyContext, + new KafkaSpoutConsumerRebalanceListener()); } @Override @@ -612,60 +609,4 @@ public String toString() { '}'; } } - - // =========== Timer =========== - - private class Timer { - private final long delay; - private final long period; - private final TimeUnit timeUnit; - private final long periodNanos; - private long start; - - /** - * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link - * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns - * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay. - * - * @param delay the initial delay before the timer starts - * @param period the period between calls {@link #isExpiredResetOnTrue()} - * @param timeUnit the time unit of delay and period - */ - public Timer(long delay, long period, TimeUnit timeUnit) { - this.delay = delay; - this.period = period; - this.timeUnit = timeUnit; - - periodNanos = timeUnit.toNanos(period); - start = System.nanoTime() + timeUnit.toNanos(delay); - } - - public long period() { - return period; - } - - public long delay() { - return delay; - } - - public TimeUnit getTimeUnit() { - return timeUnit; - } - - /** - * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the - * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset - * (re-initiated) and a new cycle will start. - * - * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false - * otherwise. - */ - public boolean isExpiredResetOnTrue() { - final boolean expired = System.nanoTime() - start > periodNanos; - if (expired) { - start = System.nanoTime(); - } - return expired; - } - } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 8aa525bdc77..5daf13fcee3 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -36,6 +36,7 @@ public class KafkaSpoutConfig implements Serializable { public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case + public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s // Kafka property names public interface Consumer { @@ -76,6 +77,8 @@ public enum FirstPollOffsetStrategy { private final long offsetCommitPeriodMs; private final int maxRetries; private final int maxUncommittedOffsets; + private final long partitionRefreshPeriodMs; + private final boolean manualPartitionAssignment; private final FirstPollOffsetStrategy firstPollOffsetStrategy; private final KafkaSpoutStreams kafkaSpoutStreams; private final KafkaSpoutTuplesBuilder tuplesBuilder; @@ -91,6 +94,8 @@ private KafkaSpoutConfig(Builder builder) { this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; this.kafkaSpoutStreams = builder.kafkaSpoutStreams; this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; + this.manualPartitionAssignment = builder.manualPartitionAssignment; this.tuplesBuilder = builder.tuplesBuilder; this.retryService = builder.retryService; } @@ -113,6 +118,8 @@ public static class Builder { private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; + private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; + private boolean manualPartitionAssignment = false; private final KafkaSpoutTuplesBuilder tuplesBuilder; private final KafkaSpoutRetryService retryService; @@ -229,6 +236,25 @@ public Builder setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPol return this; } + /** + * Sets partition refresh period in milliseconds in manual partition assignment model. Default is 2s. + * @param partitionRefreshPeriodMs time in milliseconds + */ + public Builder setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) { + this.partitionRefreshPeriodMs = partitionRefreshPeriodMs; + return this; + } + + /** + * Defines whether the consumer manages partition manually. + * If set to true, the consumer manage partition manually, otherwise it will rely on kafka to do partition assignment. + * @param manualPartitionAssignment True if using manual partition assignment. + */ + public Builder setManualPartitionAssignment(boolean manualPartitionAssignment) { + this.manualPartitionAssignment = manualPartitionAssignment; + return this; + } + public KafkaSpoutConfig build() { return new KafkaSpoutConfig<>(this); } @@ -307,6 +333,14 @@ public KafkaSpoutRetryService getRetryService() { return retryService; } + public long getPartitionRefreshPeriodMs() { + return partitionRefreshPeriodMs; + } + + public boolean isManualPartitionAssignment() { + return manualPartitionAssignment; + } + @Override public String toString() { return "KafkaSpoutConfig{" + diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java new file mode 100644 index 00000000000..05eb4553da4 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java @@ -0,0 +1,16 @@ +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Comparator; + +public class TopicPartitionComparator implements Comparator { + @Override + public int compare(TopicPartition o1, TopicPartition o2) { + if (!o1.topic().equals(o2.topic())) { + return o1.topic().compareTo(o2.topic()); + } else { + return o1.partition() - o2.partition(); + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java new file mode 100644 index 00000000000..45ae3309976 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import java.util.concurrent.TimeUnit; + +public class Timer { + private final long delay; + private final long period; + private final TimeUnit timeUnit; + private final long periodNanos; + private long start; + + /** + * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link + * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns + * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay. + * + * @param delay the initial delay before the timer starts + * @param period the period between calls {@link #isExpiredResetOnTrue()} + * @param timeUnit the time unit of delay and period + */ + public Timer(long delay, long period, TimeUnit timeUnit) { + this.delay = delay; + this.period = period; + this.timeUnit = timeUnit; + + periodNanos = timeUnit.toNanos(period); + start = System.nanoTime() + timeUnit.toNanos(delay); + } + + public long period() { + return period; + } + + public long delay() { + return delay; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + /** + * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the + * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset + * (re-initiated) and a new cycle will start. + * + * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false + * otherwise. + */ + public boolean isExpiredResetOnTrue() { + final boolean expired = System.nanoTime() - start > periodNanos; + if (expired) { + start = System.nanoTime(); + } + return expired; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java new file mode 100644 index 00000000000..8ba70986f22 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.fetcher; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.kafka.spout.KafkaSpoutStreams; +import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics; +import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.regex.Pattern; + +public class AutomaticKafkaRecordsFetcher implements KafkaRecordsFetcher { + private static final Logger LOG = LoggerFactory.getLogger(AutomaticKafkaRecordsFetcher.class); + + private final KafkaConsumer kafkaConsumer; + private final ConsumerRebalanceListener consumerRebalanceListener; + + public AutomaticKafkaRecordsFetcher(KafkaConsumer kafkaConsumer, + ConsumerRebalanceListener consumerRebalanceListener, + KafkaSpoutStreams kafkaSpoutStreams) { + this.kafkaConsumer = kafkaConsumer; + this.consumerRebalanceListener = consumerRebalanceListener; + + subscribe(kafkaSpoutStreams); + } + + private void subscribe(KafkaSpoutStreams kafkaSpoutStreams) { + if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { + final List topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); + kafkaConsumer.subscribe(topics, consumerRebalanceListener); + LOG.info("Kafka consumer subscribed topics {}", topics); + } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { + final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern(); + kafkaConsumer.subscribe(pattern, consumerRebalanceListener); + LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); + } + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + kafkaConsumer.poll(0); + } + + @Override + public ConsumerRecords fetchRecords(long fetchTimeoutMs) { + return kafkaConsumer.poll(fetchTimeoutMs); + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java new file mode 100644 index 00000000000..47a61c137c5 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.fetcher; + +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public interface KafkaRecordsFetcher { + ConsumerRecords fetchRecords(long fetchTimeoutMs); +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java new file mode 100644 index 00000000000..e3b7cd64305 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.fetcher; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader; +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReaders; +import org.apache.storm.task.TopologyContext; + +import java.util.concurrent.TimeUnit; + +public final class KafkaRecordsFetchers { + public static KafkaRecordsFetcher create(KafkaSpoutConfig kafkaSpoutConfig, + KafkaConsumer consumer, + TopologyContext context, + ConsumerRebalanceListener rebalanceListener) { + if (kafkaSpoutConfig.isManualPartitionAssignment()) { + int thisTaskIndex = context.getThisTaskIndex(); + int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size(); + KafkaPartitionReader partitionReader = KafkaPartitionReaders.create( + kafkaSpoutConfig.getKafkaSpoutStreams()); + Timer partitionRefreshTimer = new Timer(500, + kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); + + ManualKafkaRecordsFetcher.PartitionAssignmentChangeListener partitionAssignmentChangeListener = + ManualKafkaRecordsFetcher.listenerOf(rebalanceListener); + + return new ManualKafkaRecordsFetcher<>(consumer, thisTaskIndex, totalTaskCount, partitionReader, + partitionRefreshTimer, partitionAssignmentChangeListener); + } else { + return new AutomaticKafkaRecordsFetcher<>(consumer, rebalanceListener, + kafkaSpoutConfig.getKafkaSpoutStreams()); + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java new file mode 100644 index 00000000000..d02a59acff4 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.fetcher; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader; +import org.apache.storm.kafka.spout.TopicPartitionComparator; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +public class ManualKafkaRecordsFetcher implements KafkaRecordsFetcher { + private static final Comparator KAFKA_TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator(); + + private final KafkaConsumer consumer; + private final int thisTaskIndex; + private final int totalTaskCount; + private final KafkaPartitionReader partitionReader; + private final Timer partitionRefreshTimer; + private final PartitionAssignmentChangeListener partitionAssignmentChangeListener; + private Set myPartitions = Collections.emptySet(); + + public ManualKafkaRecordsFetcher(KafkaConsumer consumer, + int thisTaskIndex, + int totalTaskCount, + KafkaPartitionReader partitionReader, + Timer partitionRefreshTimer, + PartitionAssignmentChangeListener partitionAssignmentChangeListener) { + this.consumer = consumer; + this.thisTaskIndex = thisTaskIndex; + this.totalTaskCount = totalTaskCount; + this.partitionReader = partitionReader; + this.partitionRefreshTimer = partitionRefreshTimer; + this.partitionAssignmentChangeListener = partitionAssignmentChangeListener; + + doRefreshMyPartitions(); + } + + private void refreshMyPartitionsIfNeeded() { + if (!partitionRefreshTimer.isExpiredResetOnTrue()) { + return; + } + + doRefreshMyPartitions(); + } + + private void doRefreshMyPartitions() { + List topicPartitions = partitionReader.readPartitions(consumer); + Collections.sort(topicPartitions, KAFKA_TOPIC_PARTITION_COMPARATOR); + + Set curPartitions = new HashSet<>(topicPartitions.size()/totalTaskCount+1); + for (int i=thisTaskIndex; i fetchRecords(long fetchTimeoutMs) { + refreshMyPartitionsIfNeeded(); + + return consumer.poll(fetchTimeoutMs); + } + + @FunctionalInterface + public interface PartitionAssignmentChangeListener { + void onPartitionAssignmentChange(Set oldPartitions, Set newPartitions); + } + + public static PartitionAssignmentChangeListener listenerOf(final ConsumerRebalanceListener consumerRebalanceListener) { + return (oldPartitions, newPartitions) -> { + consumerRebalanceListener.onPartitionsRevoked(oldPartitions); + consumerRebalanceListener.onPartitionsAssigned(newPartitions); + }; + } +} + diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java new file mode 100644 index 00000000000..e3480acc4fc --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.partition; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +public interface KafkaPartitionReader { + List readPartitions(KafkaConsumer consumer); +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java new file mode 100644 index 00000000000..4e51c1e52ec --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.partition; + +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutStreams; +import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics; +import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics; + +import java.util.HashSet; + +public final class KafkaPartitionReaders { + public static KafkaPartitionReader create(KafkaSpoutStreams kafkaSpoutStreams) { + if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { + return new NamedTopicPartitionReader(new HashSet<>( + KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams).getTopics())); + } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { + return new WildcardTopicPartitionReader( + KafkaSpoutStreamsWildcardTopics.class.cast(kafkaSpoutStreams).getTopicWildcardPattern()); + } else { + throw new IllegalArgumentException("Unrecognized kafka spout stream: " + kafkaSpoutStreams.getClass()); + } + } + + public static TopicPartition toTopicPartition(PartitionInfo partitionInfo) { + return new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java new file mode 100644 index 00000000000..41db16914e4 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.partition; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class NamedTopicPartitionReader implements KafkaPartitionReader { + private final Set topics; + + public NamedTopicPartitionReader(Set topics) { + this.topics = topics; + } + + @Override + public List readPartitions(KafkaConsumer consumer) { + List topicPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + topicPartitions.add(KafkaPartitionReaders.toTopicPartition(partitionInfo)); + } + } + + return topicPartitions; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java new file mode 100644 index 00000000000..fcac1c1fb83 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal.partition; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class WildcardTopicPartitionReader implements KafkaPartitionReader { + private final Pattern wildcardTopicPattern; + + public WildcardTopicPartitionReader(Pattern wildcardTopicPattern) { + this.wildcardTopicPattern = wildcardTopicPattern; + } + + @Override + public List readPartitions(KafkaConsumer consumer) { + List topicPartitions = new ArrayList<>(); + + for(Map.Entry> entry: consumer.listTopics().entrySet()) { + if (wildcardTopicPattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo: entry.getValue()) { + topicPartitions.add(KafkaPartitionReaders.toTopicPartition(partitionInfo)); + } + } + } + + return topicPartitions; + } +}