Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;

import com.google.common.annotations.VisibleForTesting;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -57,6 +56,7 @@
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -68,7 +68,7 @@
public class KafkaSpout<K, V> extends BaseRichSpout {

private static final long serialVersionUID = 4151921085047987154L;
//Initial delay for the commit and subscription refresh timers
//Initial delay for the commit and assignment refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

Expand All @@ -77,7 +77,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {

// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
private final KafkaConsumerFactory<K, V> kafkaConsumerFactory;
private final TopicAssigner topicAssigner;
private transient KafkaConsumer<K, V> kafkaConsumer;

// Bookkeeping
Expand All @@ -99,19 +100,21 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient Set<KafkaSpoutMessageId> emitted;
// Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
// Triggers when a subscription should be refreshed
private transient Timer refreshSubscriptionTimer;
// Triggers when an assignment should be refreshed
private transient Timer refreshAssignmentTimer;
private transient TopologyContext context;
private transient CommitMetadataManager commitMetadataManager;
private transient KafkaOffsetMetric kafkaOffsetMetric;
private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;

public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>(), new TopicAssigner());
}

@VisibleForTesting
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.topicAssigner = topicAssigner;
this.kafkaSpoutConfig = kafkaSpoutConfig;
}

Expand All @@ -134,13 +137,15 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
// In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);

offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());

rebalanceListener = new KafkaSpoutConsumerRebalanceListener();

tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
Expand Down Expand Up @@ -267,8 +272,8 @@ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
@Override
public void nextTuple() {
try {
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaSpoutConfig.getSubscription().refreshAssignment();
if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
refreshAssignment();
}

if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
Expand Down Expand Up @@ -617,16 +622,20 @@ public void fail(Object messageId) {
@Override
public void activate() {
try {
subscribeKafkaConsumer();
kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
refreshAssignment();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}

private void subscribeKafkaConsumer() {
kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);

kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
private void refreshAssignment() {
Set<TopicPartition> allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(kafkaConsumer);
List<TopicPartition> allPartitionsSorted = new ArrayList<>(allPartitions);
Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
Set<TopicPartition> assignedPartitions = kafkaSpoutConfig.getTopicPartitioner()
.getPartitionsForThisTask(allPartitionsSorted, context);
topicAssigner.assignPartitions(kafkaConsumer, assignedPartitions, rebalanceListener);
}

@Override
Expand Down Expand Up @@ -691,7 +700,7 @@ public Map<String, Object> getComponentConfiguration() {
}

private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
return kafkaSpoutConfig.getTopicFilter().getTopicsString();
}

private static class PollablePartitionsInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.storm.Config;
import org.apache.storm.annotation.InterfaceStability;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
import org.apache.storm.kafka.spout.subscription.Subscription;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,7 +75,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {

// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Subscription subscription;
private final TopicFilter topicFilter;
private final ManualPartitioner topicPartitioner;
private final long pollTimeoutMs;

// Kafka spout configuration
Expand All @@ -99,7 +100,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public KafkaSpoutConfig(Builder<K, V> builder) {
setKafkaPropsForProcessingGuarantee(builder);
this.kafkaProps = builder.kafkaProps;
this.subscription = builder.subscription;
this.topicFilter = builder.topicFilter;
this.topicPartitioner = builder.topicPartitioner;
this.translator = builder.translator;
this.pollTimeoutMs = builder.pollTimeoutMs;
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
Expand Down Expand Up @@ -175,7 +177,8 @@ public enum ProcessingGuarantee {
public static class Builder<K, V> {

private final Map<String, Object> kafkaProps;
private final Subscription subscription;
private final TopicFilter topicFilter;
private final ManualPartitioner topicPartitioner;
private RecordTranslator<K, V> translator;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
Expand All @@ -190,31 +193,32 @@ public static class Builder<K, V> {
private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;

public Builder(String bootstrapServers, String... topics) {
this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
}

public Builder(String bootstrapServers, Set<String> topics) {
this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
new NamedTopicFilter(topics)));
this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
}

public Builder(String bootstrapServers, Pattern topics) {
this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner());
}

/**
* Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
*
* @param bootstrapServers The bootstrap servers the consumer will use
* @param subscription The subscription defining which topics and partitions each spout instance will read.
* @param topicFilter The topic filter defining which topics and partitions the spout will read
* @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task
*/
public Builder(String bootstrapServers, Subscription subscription) {
public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
kafkaProps = new HashMap<>();
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new IllegalArgumentException("bootstrap servers cannot be null");
}
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.subscription = subscription;
this.topicFilter = topicFilter;
this.topicPartitioner = topicPartitioner;
this.translator = new DefaultRecordTranslator<>();
}

Expand Down Expand Up @@ -358,9 +362,8 @@ public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>
}

/**
* Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
* partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
* PatternSubscription rely on kafka to handle this instead.
* Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new
* partitions.
*
* @param partitionRefreshPeriodMs time in milliseconds
* @return the builder (this)
Expand Down Expand Up @@ -502,8 +505,12 @@ public Map<String, Object> getKafkaProps() {
return kafkaProps;
}

public Subscription getSubscription() {
return subscription;
public TopicFilter getTopicFilter() {
return topicFilter;
}

public ManualPartitioner getTopicPartitioner() {
return topicPartitioner;
}

public RecordTranslator<K, V> getTranslator() {
Expand Down Expand Up @@ -566,7 +573,8 @@ public String toString() {
+ ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+ ", maxUncommittedOffsets=" + maxUncommittedOffsets
+ ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+ ", subscription=" + subscription
+ ", topicFilter=" + topicFilter
+ ", topicPartitioner=" + topicPartitioner
+ ", translator=" + translator
+ ", retryService=" + retryService
+ ", tupleListener=" + tupleListener
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,25 @@

import java.io.Serializable;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.task.TopologyContext;

/**
* A function used to assign partitions to this spout.
* WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
*
* <p>WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
* The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
* number of spouts to avoid missing partitions or double assigning partitions.
*/
@FunctionalInterface
public interface ManualPartitioner extends Serializable {
/**
* Get the partitions for this assignment.
* @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
* Filter the list of all partitions handled by this set of spouts to get only the partitions assigned to this task.
* @param allPartitionsSorted all of the partitions that the set of spouts want to subscribe to
* in a strict ordering that is consistent across tasks
* @param context the context of the topology
* @return the subset of the partitions that this spout should use.
* @return the subset of the partitions that this spout task should handle.
*/
public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.apache.storm.kafka.spout.subscription;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -50,8 +49,8 @@ public NamedTopicFilter(String... topics) {
}

@Override
public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
List<TopicPartition> allPartitions = new ArrayList<>();
public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
Set<TopicPartition> allPartitions = new HashSet<>();
for (String topic : topics) {
for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.storm.kafka.spout.subscription;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -25,6 +26,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.TopicPartitionComparator;

/**
* Filter that returns all partitions for topics matching the given {@link Pattern}.
Expand All @@ -44,9 +46,9 @@ public PatternTopicFilter(Pattern pattern) {
}

@Override
public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
topics.clear();
List<TopicPartition> allPartitions = new ArrayList<>();
Set<TopicPartition> allPartitions = new HashSet<>();
for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo partitionInfo : entry.getValue()) {
Expand Down
Loading