diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java deleted file mode 100644 index 23444776d6a..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -public class ManualPartitionPatternSubscription extends PatternSubscription { - private static final long serialVersionUID = 5633018073527583826L; - private final ManualPartitioner parter; - private Set currentAssignment = null; - private KafkaConsumer consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; - - public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) { - super(pattern); - this.parter = parter; - } - - @Override - public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { - this.consumer = consumer; - this.listener = listener; - this.context = context; - refreshAssignment(); - } - - @Override - public void refreshAssignment() { - List allPartitions = new ArrayList<>(); - for (Map.Entry> entry: consumer.listTopics().entrySet()) { - if (pattern.matcher(entry.getKey()).matches()) { - for (PartitionInfo partitionInfo: entry.getValue()) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } - } - Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set newAssignment = new HashSet<>(parter.partition(allPartitions, context)); - if (!newAssignment.equals(currentAssignment)) { - if (currentAssignment != null) { - listener.onPartitionsRevoked(currentAssignment); - listener.onPartitionsAssigned(newAssignment); - } - currentAssignment = newAssignment; - consumer.assign(currentAssignment); - } - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java similarity index 72% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java index 926fdf02ec2..2c65d6d8626 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -18,36 +18,29 @@ package org.apache.storm.kafka.spout; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; -public class ManualPartitionNamedSubscription extends NamedSubscription { +public class ManualPartitionSubscription extends Subscription { private static final long serialVersionUID = 5633018073527583826L; private final ManualPartitioner partitioner; + private final TopicFilter partitionFilter; private Set currentAssignment = null; private KafkaConsumer consumer = null; private ConsumerRebalanceListener listener = null; private TopologyContext context = null; - public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection topics) { - super(topics); + public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { + this.partitionFilter = partitionFilter; this.partitioner = parter; } - public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) { - this(parter, Arrays.asList(topics)); - } - @Override public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { this.consumer = consumer; @@ -58,21 +51,21 @@ public void subscribe(KafkaConsumer consumer, ConsumerRebalanceList @Override public void refreshAssignment() { - List allPartitions = new ArrayList<>(); - for (String topic : topics) { - for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } + List allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); Set newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); if (!newAssignment.equals(currentAssignment)) { + consumer.assign(newAssignment); if (currentAssignment != null) { listener.onPartitionsRevoked(currentAssignment); - listener.onPartitionsAssigned(newAssignment); } currentAssignment = newAssignment; - consumer.assign(currentAssignment); + listener.onPartitionsAssigned(newAssignment); } } + + @Override + public String getTopicsString() { + return partitionFilter.getTopicsString(); + } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java new file mode 100644 index 00000000000..982828d51e6 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for the specified topics. + */ +public class NamedTopicFilter implements TopicFilter { + + private final Set topics; + + /** + * Create filter based on a set of topic names. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(Set topics) { + this.topics = Collections.unmodifiableSet(topics); + } + + /** + * Convenience constructor. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(String... topics) { + this(new HashSet<>(Arrays.asList(topics))); + } + + @Override + public List getFilteredTopicPartitions(KafkaConsumer consumer) { + List allPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java new file mode 100644 index 00000000000..29648748736 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for topics matching the given {@link Pattern}. + */ +public class PatternTopicFilter implements TopicFilter { + + private final Pattern pattern; + private final Set topics = new HashSet<>(); + + /** + * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. + * + * @param pattern The Pattern to use. + */ + public PatternTopicFilter(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public List getFilteredTopicPartitions(KafkaConsumer consumer) { + topics.clear(); + List allPartitions = new ArrayList<>(); + for (Map.Entry> entry : consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo : entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + topics.add(partitionInfo.topic()); + } + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } + + public String getTopicsPattern() { + return pattern.pattern(); + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java index 53e825ae92c..9c5a8c4a388 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java @@ -37,7 +37,7 @@ public abstract class Subscription implements Serializable { public abstract void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context); /** - * @return a string representing the subscribed topics. + * @return A human-readable string representing the subscribed topics. */ public abstract String getTopicsString(); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java new file mode 100644 index 00000000000..7631c8a7d45 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public interface TopicFilter extends Serializable { + + /** + * Get the Kafka TopicPartitions passed by this filter. + * @param consumer The Kafka consumer to use to read the list of existing partitions + * @return The Kafka partitions passed by this filter. + */ + List getFilteredTopicPartitions(KafkaConsumer consumer); + + /** + * @return A human-readable string representing the topics that pass the filter. + */ + String getTopicsString(); + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java new file mode 100644 index 00000000000..e97c7e1f2a7 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class NamedTopicFilterTest { + + private KafkaConsumer consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo); + + when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List partitionTwoPartitions = new ArrayList<>(); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); + when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + List matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java new file mode 100644 index 00000000000..877efdcc9ad --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class PatternTopicFilterTest { + + private KafkaConsumer consumerMock; + + @Before + public void setUp(){ + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + Pattern pattern = Pattern.compile("test-\\d+"); + PatternTopicFilter filter = new PatternTopicFilter(pattern); + + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + Map> allTopics = new HashMap<>(); + allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List testTwoPartitions = new ArrayList<>(); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + allTopics.put(matchingTopicTwo, testTwoPartitions); + allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + when(consumerMock.listTopics()).thenReturn(allTopics); + + List matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } +}