From a20b2659c20b1a12dd455023010f4454d62ee313 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 18 Aug 2017 22:13:38 +0200 Subject: [PATCH] STORM-2691: storm-kafka-client Trident spout does not implement the Trident spout interface properly --- .../apache/storm/kafka/spout/KafkaSpout.java | 41 ++-- .../storm/kafka/spout/KafkaSpoutConfig.java | 44 ++-- .../ManualPartitionSubscription.java | 69 ------ .../spout/subscription/ManualPartitioner.java | 13 +- .../spout/subscription/NamedTopicFilter.java | 5 +- .../subscription/PatternTopicFilter.java | 6 +- .../RoundRobinManualPartitioner.java | 11 +- .../spout/subscription/Subscription.java | 57 ----- .../spout/subscription/TopicAssigner.java | 51 +++++ .../kafka/spout/subscription/TopicFilter.java | 8 +- .../KafkaTridentSpoutBatchMetadata.java | 55 ++--- .../trident/KafkaTridentSpoutEmitter.java | 130 +++++------ .../trident/KafkaTridentSpoutManager.java | 116 ---------- .../trident/KafkaTridentSpoutOpaque.java | 26 ++- .../KafkaTridentSpoutOpaqueCoordinator.java | 43 +++- .../KafkaTridentSpoutTopicPartition.java | 6 +- ...fkaTridentSpoutTopicPartitionRegistry.java | 47 ---- .../KafkaTridentSpoutTransactional.java | 49 ---- .../kafka/spout/KafkaSpoutAbstractTest.java | 13 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 7 +- .../KafkaSpoutLogCompactionSupportTest.java | 5 +- .../KafkaSpoutMessagingGuaranteeTest.java | 21 +- .../spout/KafkaSpoutReactivationTest.java | 3 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 67 +++--- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 15 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 1 - .../SpoutWithMockedConsumerSetupHelper.java | 25 +- .../SingleTopicKafkaSpoutConfiguration.java | 7 +- .../subscription/NamedTopicFilterTest.java | 3 +- .../subscription/PatternTopicFilterTest.java | 3 +- .../RoundRobinManualPartitionerTest.java | 76 +++++++ ...iptionTest.java => TopicAssignerTest.java} | 33 +-- .../KafkaTridentSpoutBatchMetadataTest.java | 13 +- .../trident/KafkaTridentSpoutEmitterTest.java | 214 ++++++++++++++++++ ...afkaTridentSpoutOpaqueCoordinatorTest.java | 111 +++++++++ pom.xml | 12 +- .../spout/IOpaquePartitionedTridentSpout.java | 21 +- ...OpaquePartitionedTridentSpoutExecutor.java | 10 +- 38 files changed, 807 insertions(+), 630 deletions(-) delete mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java delete mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java delete mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java delete mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java delete mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java rename external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/{ManualPartitionSubscriptionTest.java => TopicAssignerTest.java} (69%) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 39bdb93bdc3..9f9f5bb55a7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -24,7 +24,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -57,6 +56,7 @@ import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -68,7 +68,7 @@ public class KafkaSpout extends BaseRichSpout { private static final long serialVersionUID = 4151921085047987154L; - //Initial delay for the commit and subscription refresh timers + //Initial delay for the commit and assignment refresh timers public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); @@ -77,7 +77,8 @@ public class KafkaSpout extends BaseRichSpout { // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; - private KafkaConsumerFactory kafkaConsumerFactory; + private final KafkaConsumerFactory kafkaConsumerFactory; + private final TopicAssigner topicAssigner; private transient KafkaConsumer kafkaConsumer; // Bookkeeping @@ -99,19 +100,21 @@ public class KafkaSpout extends BaseRichSpout { private transient Set emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient Map>> waitingToEmit; - // Triggers when a subscription should be refreshed - private transient Timer refreshSubscriptionTimer; + // Triggers when an assignment should be refreshed + private transient Timer refreshAssignmentTimer; private transient TopologyContext context; private transient CommitMetadataManager commitMetadataManager; private transient KafkaOffsetMetric kafkaOffsetMetric; + private transient KafkaSpoutConsumerRebalanceListener rebalanceListener; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { - this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); + this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>(), new TopicAssigner()); } @VisibleForTesting - KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) { + KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory, TopicAssigner topicAssigner) { this.kafkaConsumerFactory = kafkaConsumerFactory; + this.topicAssigner = topicAssigner; this.kafkaSpoutConfig = kafkaSpoutConfig; } @@ -134,13 +137,15 @@ public void open(Map conf, TopologyContext context, SpoutOutputC // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } - refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); + refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); offsetManagers = new HashMap<>(); emitted = new HashSet<>(); waitingToEmit = new HashMap<>(); commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee()); + rebalanceListener = new KafkaSpoutConsumerRebalanceListener(); + tupleListener.open(conf, context); if (canRegisterMetrics()) { registerMetric(); @@ -267,8 +272,8 @@ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { @Override public void nextTuple() { try { - if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { - kafkaSpoutConfig.getSubscription().refreshAssignment(); + if (refreshAssignmentTimer.isExpiredResetOnTrue()) { + refreshAssignment(); } if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { @@ -617,16 +622,20 @@ public void fail(Object messageId) { @Override public void activate() { try { - subscribeKafkaConsumer(); + kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); + refreshAssignment(); } catch (InterruptException e) { throwKafkaConsumerInterruptedException(); } } - private void subscribeKafkaConsumer() { - kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); - - kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context); + private void refreshAssignment() { + Set allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(kafkaConsumer); + List allPartitionsSorted = new ArrayList<>(allPartitions); + Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE); + Set assignedPartitions = kafkaSpoutConfig.getTopicPartitioner() + .getPartitionsForThisTask(allPartitionsSorted, context); + topicAssigner.assignPartitions(kafkaConsumer, assignedPartitions, rebalanceListener); } @Override @@ -691,7 +700,7 @@ public Map getComponentConfiguration() { } private String getTopicsString() { - return kafkaSpoutConfig.getSubscription().getTopicsString(); + return kafkaSpoutConfig.getTopicFilter().getTopicsString(); } private static class PollablePartitionsInfo { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 40e449a37b0..3b7be2b4736 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -33,11 +33,11 @@ import org.apache.storm.Config; import org.apache.storm.annotation.InterfaceStability; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; -import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; import org.apache.storm.kafka.spout.subscription.NamedTopicFilter; import org.apache.storm.kafka.spout.subscription.PatternTopicFilter; import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +75,8 @@ public class KafkaSpoutConfig implements Serializable { // Kafka consumer configuration private final Map kafkaProps; - private final Subscription subscription; + private final TopicFilter topicFilter; + private final ManualPartitioner topicPartitioner; private final long pollTimeoutMs; // Kafka spout configuration @@ -99,7 +100,8 @@ public class KafkaSpoutConfig implements Serializable { public KafkaSpoutConfig(Builder builder) { setKafkaPropsForProcessingGuarantee(builder); this.kafkaProps = builder.kafkaProps; - this.subscription = builder.subscription; + this.topicFilter = builder.topicFilter; + this.topicPartitioner = builder.topicPartitioner; this.translator = builder.translator; this.pollTimeoutMs = builder.pollTimeoutMs; this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; @@ -175,7 +177,8 @@ public enum ProcessingGuarantee { public static class Builder { private final Map kafkaProps; - private final Subscription subscription; + private final TopicFilter topicFilter; + private final ManualPartitioner topicPartitioner; private RecordTranslator translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; @@ -190,31 +193,32 @@ public static class Builder { private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS; public Builder(String bootstrapServers, String... topics) { - this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); + this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner()); } public Builder(String bootstrapServers, Set topics) { - this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), - new NamedTopicFilter(topics))); + this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner()); } public Builder(String bootstrapServers, Pattern topics) { - this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); + this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner()); } /** * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers. * * @param bootstrapServers The bootstrap servers the consumer will use - * @param subscription The subscription defining which topics and partitions each spout instance will read. + * @param topicFilter The topic filter defining which topics and partitions the spout will read + * @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task */ - public Builder(String bootstrapServers, Subscription subscription) { + public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) { kafkaProps = new HashMap<>(); if (bootstrapServers == null || bootstrapServers.isEmpty()) { throw new IllegalArgumentException("bootstrap servers cannot be null"); } kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - this.subscription = subscription; + this.topicFilter = topicFilter; + this.topicPartitioner = topicPartitioner; this.translator = new DefaultRecordTranslator<>(); } @@ -358,9 +362,8 @@ public Builder setRecordTranslator(Func, List } /** - * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new - * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and - * PatternSubscription rely on kafka to handle this instead. + * Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new + * partitions. * * @param partitionRefreshPeriodMs time in milliseconds * @return the builder (this) @@ -502,8 +505,12 @@ public Map getKafkaProps() { return kafkaProps; } - public Subscription getSubscription() { - return subscription; + public TopicFilter getTopicFilter() { + return topicFilter; + } + + public ManualPartitioner getTopicPartitioner() { + return topicPartitioner; } public RecordTranslator getTranslator() { @@ -566,7 +573,8 @@ public String toString() { + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + ", maxUncommittedOffsets=" + maxUncommittedOffsets + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy - + ", subscription=" + subscription + + ", topicFilter=" + topicFilter + + ", topicPartitioner=" + topicPartitioner + ", translator=" + translator + ", retryService=" + retryService + ", tupleListener=" + tupleListener diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java deleted file mode 100644 index 8e74abbd35e..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.subscription; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.TopicPartitionComparator; -import org.apache.storm.task.TopologyContext; - -public class ManualPartitionSubscription extends Subscription { - private static final long serialVersionUID = 5633018073527583826L; - private final ManualPartitioner partitioner; - private final TopicFilter partitionFilter; - private transient KafkaConsumer consumer = null; - private transient ConsumerRebalanceListener listener = null; - private transient TopologyContext context = null; - - public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { - this.partitionFilter = partitionFilter; - this.partitioner = parter; - } - - @Override - public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { - this.consumer = consumer; - this.listener = listener; - this.context = context; - refreshAssignment(); - } - - @Override - public void refreshAssignment() { - List allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); - Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); - Set currentAssignment = consumer.assignment(); - if (!newAssignment.equals(currentAssignment)) { - listener.onPartitionsRevoked(currentAssignment); - consumer.assign(newAssignment); - listener.onPartitionsAssigned(newAssignment); - } - } - - @Override - public String getTopicsString() { - return partitionFilter.getTopicsString(); - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java index 0e35b737dac..9db06139e83 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java @@ -20,22 +20,25 @@ import java.io.Serializable; import java.util.List; +import java.util.Set; import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; /** * A function used to assign partitions to this spout. - * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions. + * + *

WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions. * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total * number of spouts to avoid missing partitions or double assigning partitions. */ @FunctionalInterface public interface ManualPartitioner extends Serializable { /** - * Get the partitions for this assignment. - * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering + * Filter the list of all partitions handled by this set of spouts to get only the partitions assigned to this task. + * @param allPartitionsSorted all of the partitions that the set of spouts want to subscribe to + * in a strict ordering that is consistent across tasks * @param context the context of the topology - * @return the subset of the partitions that this spout should use. + * @return the subset of the partitions that this spout task should handle. */ - public List partition(List allPartitions, TopologyContext context); + public Set getPartitionsForThisTask(List allPartitionsSorted, TopologyContext context); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java index d6e5fc2f07f..7c25596f7d9 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java @@ -16,7 +16,6 @@ package org.apache.storm.kafka.spout.subscription; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -50,8 +49,8 @@ public NamedTopicFilter(String... topics) { } @Override - public List getFilteredTopicPartitions(KafkaConsumer consumer) { - List allPartitions = new ArrayList<>(); + public Set getAllSubscribedPartitions(KafkaConsumer consumer) { + Set allPartitions = new HashSet<>(); for (String topic : topics) { for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java index 98f8b23b387..554876fe7c8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java @@ -17,6 +17,7 @@ package org.apache.storm.kafka.spout.subscription; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -25,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.TopicPartitionComparator; /** * Filter that returns all partitions for topics matching the given {@link Pattern}. @@ -44,9 +46,9 @@ public PatternTopicFilter(Pattern pattern) { } @Override - public List getFilteredTopicPartitions(KafkaConsumer consumer) { + public Set getAllSubscribedPartitions(KafkaConsumer consumer) { topics.clear(); - List allPartitions = new ArrayList<>(); + Set allPartitions = new HashSet<>(); for (Map.Entry> entry : consumer.listTopics().entrySet()) { if (pattern.matcher(entry.getKey()).matches()) { for (PartitionInfo partitionInfo : entry.getValue()) { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java index 9660c77ddbe..ee2916a6763 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java @@ -18,7 +18,6 @@ package org.apache.storm.kafka.spout.subscription; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -38,13 +37,13 @@ public class RoundRobinManualPartitioner implements ManualPartitioner { @Override - public List partition(List allPartitions, TopologyContext context) { + public Set getPartitionsForThisTask(List allPartitionsSorted, TopologyContext context) { int thisTaskIndex = context.getThisTaskIndex(); int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size(); - Set myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1); - for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) { - myPartitions.add(allPartitions.get(i)); + Set myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1); + for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) { + myPartitions.add(allPartitionsSorted.get(i)); } - return new ArrayList<>(myPartitions); + return myPartitions; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java deleted file mode 100644 index 6fd8c2b2351..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.subscription; - -import java.io.Serializable; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.storm.task.TopologyContext; - -/** - * A subscription to kafka. - */ -public abstract class Subscription implements Serializable { - private static final long serialVersionUID = -216136367240198716L; - - /** - * Subscribe the KafkaConsumer to the proper topics. - * Implementations must ensure that a given topic partition is always assigned to the same spout task. - * Adding and removing partitions as necessary is fine, but partitions must not move from one task to another. - * This constraint is only important for use with the Trident spout. - * @param consumer the Consumer to get. - * @param listener the rebalance listener to include in the subscription - */ - public abstract void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context); - - /** - * Get the topics string. - * @return A human-readable string representing the subscribed topics. - */ - public abstract String getTopicsString(); - - /** - * NOOP is the default behavior, which means that Kafka will internally handle partition assignment. - * If you wish to do manual partition management, you must provide an implementation of this method - * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe - * to inform the rest of the system of those changes. - */ - public void refreshAssignment() { - //NOOP - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java new file mode 100644 index 00000000000..dcc93ceef07 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +/** + * Handles assigning partitions to the consumer and updating the rebalance listener. + */ +public class TopicAssigner implements Serializable { + + private static final long serialVersionUID = 5423018073527583826L; + + /** + * Assign partitions to the KafkaConsumer. + * @param The consumer key type + * @param The consumer value type + * @param consumer The Kafka consumer to assign partitions to + * @param newAssignment The partitions to assign. + * @param listener The rebalance listener to call back on when the assignment changes + */ + public void assignPartitions(KafkaConsumer consumer, Set newAssignment, + ConsumerRebalanceListener listener) { + Set currentAssignment = consumer.assignment(); + if (!newAssignment.equals(currentAssignment)) { + listener.onPartitionsRevoked(currentAssignment); + consumer.assign(newAssignment); + listener.onPartitionsAssigned(newAssignment); + } + } + +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java index 6af516f5f3c..ae2c2549854 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java @@ -17,18 +17,18 @@ package org.apache.storm.kafka.spout.subscription; import java.io.Serializable; -import java.util.List; +import java.util.Set; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; public interface TopicFilter extends Serializable { /** - * Get the Kafka TopicPartitions passed by this filter. + * Get the Kafka TopicPartitions subscribed to by this set of spouts. * @param consumer The Kafka consumer to use to read the list of existing partitions - * @return The Kafka partitions passed by this filter. + * @return The Kafka partitions this set of spouts should subscribe to */ - List getFilteredTopicPartitions(KafkaConsumer consumer); + Set getAllSubscribedPartitions(KafkaConsumer consumer); /** * Get the topics string. diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java index 9ba76d79dd4..6e56bb5627d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java @@ -24,8 +24,6 @@ import java.util.Map; import org.apache.commons.lang.Validate; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,45 +31,39 @@ * Wraps transaction batch information. */ public class KafkaTridentSpoutBatchMetadata implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class); private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer(); - - public static final String TOPIC_PARTITION_KEY = "tp"; + public static final String FIRST_OFFSET_KEY = "firstOffset"; public static final String LAST_OFFSET_KEY = "lastOffset"; - - // topic partition of this batch - private final TopicPartition topicPartition; + // first offset of this batch - private final long firstOffset; + private final long firstOffset; // last offset of this batch private final long lastOffset; /** * Builds a metadata object. - * @param topicPartition The topic partition + * * @param firstOffset The first offset for the batch * @param lastOffset The last offset for the batch */ - public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) { - this.topicPartition = topicPartition; + public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset) { this.firstOffset = firstOffset; this.lastOffset = lastOffset; } /** * Builds a metadata object from a non-empty set of records. - * @param topicPartition The topic partition the records belong to. + * * @param consumerRecords The non-empty set of records. */ - public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords consumerRecords) { - Validate.notNull(consumerRecords.records(topicPartition)); - List> records = consumerRecords.records(topicPartition); - Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata"); - - this.topicPartition = topicPartition; - firstOffset = records.get(0).offset(); - lastOffset = records.get(records.size() - 1).offset(); + public KafkaTridentSpoutBatchMetadata(List> consumerRecords) { + Validate.isTrue(!consumerRecords.isEmpty(), "There must be at least one record in order to build metadata"); + + firstOffset = consumerRecords.get(0).offset(); + lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset(); LOG.debug("Created {}", this.toString()); } @@ -83,28 +75,22 @@ public long getLastOffset() { return lastOffset; } - public TopicPartition getTopicPartition() { - return topicPartition; - } - /** * Constructs a metadata object from a Map in the format produced by {@link #toMap() }. + * * @param map The source map * @return A new metadata object */ public static KafkaTridentSpoutBatchMetadata fromMap(Map map) { - Map topicPartitionMap = (Map)map.get(TOPIC_PARTITION_KEY); - TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap); - return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(), - ((Number)map.get(LAST_OFFSET_KEY)).longValue()); + return new KafkaTridentSpoutBatchMetadata(((Number) map.get(FIRST_OFFSET_KEY)).longValue(), + ((Number) map.get(LAST_OFFSET_KEY)).longValue()); } - + /** * Writes this metadata object to a Map so Trident can read/write it to Zookeeper. */ public Map toMap() { Map map = new HashMap<>(); - map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition)); map.put(FIRST_OFFSET_KEY, firstOffset); map.put(LAST_OFFSET_KEY, lastOffset); return map; @@ -112,10 +98,9 @@ public Map toMap() { @Override public final String toString() { - return super.toString() - + "{topicPartition=" + topicPartition - + ", firstOffset=" + firstOffset - + ", lastOffset=" + lastOffset - + '}'; + return "KafkaTridentSpoutBatchMetadata{" + + "firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index a45eff8aaad..86535be2600 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -32,15 +32,18 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.RecordTranslator; -import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.kafka.spout.TopicPartitionComparator; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; @@ -59,49 +62,39 @@ public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTrident // Kafka private final KafkaConsumer kafkaConsumer; - - // Bookkeeping - private final KafkaTridentSpoutManager kafkaManager; + private final KafkaSpoutConfig kafkaSpoutConfig; + private final TopicAssigner topicAssigner; + // set of topic-partitions for which first poll has already occurred, and the first polled txid private final Map firstPollTransaction = new HashMap<>(); - // Declare some KafkaTridentSpoutManager references for convenience private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; private final RecordTranslator translator; - private final Timer refreshSubscriptionTimer; private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); - - private TopologyContext topologyContext; + private final TopologyContext topologyContext; /** * Create a new Kafka spout emitter. - * @param kafkaManager The Kafka consumer manager to use + * @param kafkaSpoutConfig The kafka spout config * @param topologyContext The topology context - * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription */ - public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager kafkaManager, TopologyContext topologyContext, - Timer refreshSubscriptionTimer) { - this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext); - this.kafkaManager = kafkaManager; + public KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext) { + this(kafkaSpoutConfig, topologyContext, new KafkaConsumerFactoryDefault<>(), new TopicAssigner()); + } + + KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext, + KafkaConsumerFactory consumerFactory, TopicAssigner topicAssigner) { + this.kafkaSpoutConfig = kafkaSpoutConfig; + this.kafkaConsumer = consumerFactory.createConsumer(kafkaSpoutConfig); this.topologyContext = topologyContext; - this.refreshSubscriptionTimer = refreshSubscriptionTimer; - this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator(); - - final KafkaSpoutConfig kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig(); + this.translator = kafkaSpoutConfig.getTranslator(); + this.topicAssigner = topicAssigner; this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs(); this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); LOG.debug("Created {}", this.toString()); } - /** - * Creates instance of this class with default 500 millisecond refresh subscription timer. - */ - public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager kafkaManager, TopologyContext topologyContext) { - this(kafkaManager, topologyContext, new Timer(500, - kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); - } - @Override public Map emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map lastBatch) { @@ -115,11 +108,10 @@ public Map emitPartitionBatch(TransactionAttempt tx, TridentColl KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta; Collection pausedTopicPartitions = Collections.emptySet(); - if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) { - LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " - + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " - + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments, - kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()); + if (!assignments.contains(currBatchPartition.getTopicPartition())) { + throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned." + + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." + + " The current partition is [" + currBatchPartition + "], the assigned partitions are [" + assignments + "]."); } else { try { // pause other topic-partitions to only poll from current topic-partition @@ -127,18 +119,13 @@ public Map emitPartitionBatch(TransactionAttempt tx, TridentColl seek(currBatchTp, lastBatchMeta, tx.getTransactionId()); - // poll - if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { - kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment(); - } - final ConsumerRecords records = kafkaConsumer.poll(pollTimeoutMs); LOG.debug("Polled [{}] records from Kafka.", records.count()); if (!records.isEmpty()) { emitTuples(collector, records); // build new metadata - currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records); + currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp)); } } finally { kafkaConsumer.resume(pausedTopicPartitions); @@ -196,7 +183,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet } final long fetchOffset = kafkaConsumer.position(tp); - LOG.debug("Set [fetchOffset = {}]", fetchOffset); + LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp); return fetchOffset; } @@ -215,25 +202,13 @@ private Collection pauseTopicPartitions(TopicPartition excludedT return pausedTopicPartitions; } - @Override - public void refreshPartitions(List partitionResponsibilities) { - LOG.trace("Refreshing of topic-partitions handled by Kafka. " - + "No action taken by this method for topic partitions {}", partitionResponsibilities); - } - - /** - * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions - * for this task must be assigned to the Kafka consumer running on this task. - * - * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator} - * @return ordered list of topic partitions for this task - */ @Override public List getOrderedPartitions(final List> allPartitionInfo) { - List allTopicPartitions = allPartitionInfo.stream() + List sortedPartitions = allPartitionInfo.stream() .map(map -> tpSerializer.fromMap(map)) + .sorted(TopicPartitionComparator.INSTANCE) .collect(Collectors.toList()); - final List allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions); + final List allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions); LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); return allPartitions; @@ -241,21 +216,31 @@ public List getOrderedPartitions(final List getPartitionsForTask(int taskId, int numTasks, - List> allPartitionInfo) { - final Set assignedTps = kafkaConsumer.assignment(); + List allPartitionInfoSorted) { + List tps = allPartitionInfoSorted.stream() + .map(kttp -> kttp.getTopicPartition()) + .collect(Collectors.toList()); + final Set assignedTps = kafkaSpoutConfig.getTopicPartitioner().getPartitionsForThisTask(tps, topologyContext); LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps); final List taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps); - LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId); return taskTps; } + + + @Override + public void refreshPartitions(List partitionResponsibilities) { + Set assignedTps = partitionResponsibilities.stream() + .map(kttp -> kttp.getTopicPartition()) + .collect(Collectors.toSet()); + topicAssigner.assignPartitions(kafkaConsumer, assignedTps, new KafkaSpoutConsumerRebalanceListener()); + LOG.debug("Assigned partitions [{}] to this task", assignedTps); + } private List newKafkaTridentSpoutTopicPartitions(Collection tps) { - final List kttp = new ArrayList<>(tps == null ? 0 : tps.size()); - if (tps != null) { - for (TopicPartition tp : tps) { - LOG.trace("Added topic-partition [{}]", tp); - kttp.add(new KafkaTridentSpoutTopicPartition(tp)); - } + final List kttp = new ArrayList<>(tps.size()); + for (TopicPartition tp : tps) { + LOG.trace("Added topic-partition [{}]", tp); + kttp.add(new KafkaTridentSpoutTopicPartition(tp)); } return kttp; } @@ -273,7 +258,24 @@ public void close() { @Override public final String toString() { return super.toString() - + "{kafkaManager=" + kafkaManager + + "{kafkaSpoutConfig=" + kafkaSpoutConfig + '}'; } + + /** + * Just logs reassignments. + */ + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection partitions) { + LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + } + } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java deleted file mode 100644 index b5138c25855..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.trident; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.KafkaSpoutConfig; -import org.apache.storm.kafka.spout.RecordTranslator; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Fields; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaTridentSpoutManager implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class); - - // Kafka - private transient KafkaConsumer kafkaConsumer; - - // Bookkeeping - private final KafkaSpoutConfig kafkaSpoutConfig; - // Declare some KafkaSpoutConfig references for convenience - private Fields fields; - - /** - * Create a KafkaConsumer manager for the trident spout. - * @param kafkaSpoutConfig The consumer config - */ - public KafkaTridentSpoutManager(KafkaSpoutConfig kafkaSpoutConfig) { - this.kafkaSpoutConfig = kafkaSpoutConfig; - this.fields = getFields(); - LOG.debug("Created {}", this.toString()); - } - - KafkaConsumer createAndSubscribeKafkaConsumer(TopologyContext context) { - kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps()); - - kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context); - return kafkaConsumer; - } - - KafkaConsumer getKafkaConsumer() { - return kafkaConsumer; - } - - Set getTopicPartitions() { - return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions(); - } - - final Fields getFields() { - if (fields == null) { - RecordTranslator translator = kafkaSpoutConfig.getTranslator(); - Fields fs = null; - for (String stream : translator.streams()) { - if (fs == null) { - fs = translator.getFieldsFor(stream); - } else { - if (!fs.equals(translator.getFieldsFor(stream))) { - throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields"); - } - } - } - fields = fs; - } - LOG.debug("OutputFields = {}", fields); - return fields; - } - - KafkaSpoutConfig getKafkaSpoutConfig() { - return kafkaSpoutConfig; - } - - @Override - public final String toString() { - return super.toString() - + "{kafkaConsumer=" + kafkaConsumer - + ", kafkaSpoutConfig=" + kafkaSpoutConfig - + '}'; - } - - private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { - @Override - public void onPartitionsRevoked(Collection partitions) { - LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions); - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions); - LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - } - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java index 8d33e394813..3257be799e6 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java @@ -20,8 +20,8 @@ import java.util.List; import java.util.Map; -import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.RecordTranslator; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.tuple.Fields; @@ -34,26 +34,22 @@ public class KafkaTridentSpoutOpaque implements IOpaquePartitionedTridentSp private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class); - private final KafkaTridentSpoutManager kafkaManager; - - public KafkaTridentSpoutOpaque(KafkaSpoutConfig conf) { - this(new KafkaTridentSpoutManager<>(conf)); - } + private final KafkaSpoutConfig kafkaSpoutConfig; - public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager kafkaManager) { - this.kafkaManager = kafkaManager; + public KafkaTridentSpoutOpaque(KafkaSpoutConfig kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; LOG.debug("Created {}", this.toString()); } @Override public Emitter>, KafkaTridentSpoutTopicPartition, Map> getEmitter( Map conf, TopologyContext context) { - return new KafkaTridentSpoutEmitter<>(kafkaManager, context); + return new KafkaTridentSpoutEmitter<>(kafkaSpoutConfig, context); } @Override public Coordinator>> getCoordinator(Map conf, TopologyContext context) { - return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager); + return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaSpoutConfig); } @Override @@ -63,7 +59,13 @@ public Map getComponentConfiguration() { @Override public Fields getOutputFields() { - final Fields outputFields = kafkaManager.getFields(); + RecordTranslator translator = kafkaSpoutConfig.getTranslator(); + int numStreams = translator.streams().size(); + if (numStreams > 1) { + throw new IllegalStateException("Trident spouts must have at most one output stream," + + " found streams [" + translator.streams() + "]"); + } + final Fields outputFields = translator.getFieldsFor(translator.streams().get(0)); LOG.debug("OutputFields = {}", outputFields); return outputFields; } @@ -71,6 +73,6 @@ public Fields getOutputFields() { @Override public final String toString() { return super.toString() - + "{kafkaManager=" + kafkaManager + '}'; + + "{kafkaSpoutConfig=" + kafkaSpoutConfig + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java index 17732c2746e..3aa3a99d3f0 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java @@ -19,24 +19,46 @@ package org.apache.storm.kafka.spout.trident; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaTridentSpoutOpaqueCoordinator implements IOpaquePartitionedTridentSpout.Coordinator>>, Serializable { + //Initial delay for the assignment refresh timer + public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class); private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); - private final KafkaTridentSpoutManager kafkaManager; + private final KafkaSpoutConfig kafkaSpoutConfig; + private final Timer refreshAssignmentTimer; + private final KafkaConsumer consumer; + + private Set partitionsForBatch; - public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager kafkaManager) { - this.kafkaManager = kafkaManager; + /** + * Creates a new coordinator based on the given spout config. + * @param kafkaSpoutConfig The spout config to use + */ + public KafkaTridentSpoutOpaqueCoordinator(KafkaSpoutConfig kafkaSpoutConfig) { + this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); + } + + KafkaTridentSpoutOpaqueCoordinator(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory consumerFactory) { + this.kafkaSpoutConfig = kafkaSpoutConfig; + this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); + this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig); LOG.debug("Created {}", this.toString()); } @@ -48,22 +70,25 @@ public boolean isReady(long txid) { @Override public List> getPartitionsForBatch() { - final ArrayList topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions()); - LOG.debug("TopicPartitions for batch {}", topicPartitions); - return topicPartitions.stream() + if (refreshAssignmentTimer.isExpiredResetOnTrue() || partitionsForBatch == null) { + partitionsForBatch = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(consumer); + } + LOG.debug("TopicPartitions for batch {}", partitionsForBatch); + return partitionsForBatch.stream() .map(tp -> tpSerializer.toMap(tp)) .collect(Collectors.toList()); } @Override public void close() { - LOG.debug("Closed"); // the "old" trident kafka spout is no op like this + this.consumer.close(); + LOG.debug("Closed"); } @Override public final String toString() { return super.toString() - + "{kafkaManager=" + kafkaManager + + "{kafkaSpoutConfig=" + kafkaSpoutConfig + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java index fd14bafe05d..f064b816ea7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java @@ -26,7 +26,7 @@ * {@link ISpoutPartition} that wraps {@link TopicPartition} information. */ public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Serializable { - private TopicPartition topicPartition; + private final TopicPartition topicPartition; public KafkaTridentSpoutTopicPartition(String topic, int partition) { this(new TopicPartition(topic, partition)); @@ -56,12 +56,12 @@ public boolean equals(Object o) { KafkaTridentSpoutTopicPartition that = (KafkaTridentSpoutTopicPartition) o; - return topicPartition != null ? topicPartition.equals(that.topicPartition) : that.topicPartition == null; + return topicPartition.equals(that.topicPartition); } @Override public int hashCode() { - return topicPartition != null ? topicPartition.hashCode() : 0; + return topicPartition.hashCode(); } @Override diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java deleted file mode 100644 index 24bc1e281a3..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.trident; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.Set; -import org.apache.kafka.common.TopicPartition; - -public enum KafkaTridentSpoutTopicPartitionRegistry { - INSTANCE; - - private Set topicPartitions; - - KafkaTridentSpoutTopicPartitionRegistry() { - this.topicPartitions = new LinkedHashSet<>(); - } - - public Set getTopicPartitions() { - return Collections.unmodifiableSet(topicPartitions); - } - - public void addAll(Collection topicPartitions) { - this.topicPartitions.addAll(topicPartitions); - } - - public void removeAll(Collection topicPartitions) { - this.topicPartitions.removeAll(topicPartitions); - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java deleted file mode 100644 index e41f95d3e0c..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.trident; - -import java.util.Map; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.trident.spout.IPartitionedTridentSpout; -import org.apache.storm.trident.spout.ISpoutPartition; -import org.apache.storm.tuple.Fields; - -// TODO -public class KafkaTridentSpoutTransactional - implements IPartitionedTridentSpout { - @Override - public Coordinator getCoordinator(Map conf, TopologyContext context) { - return null; - } - - @Override - public Emitter getEmitter(Map conf, TopologyContext context) { - return null; - } - - @Override - public Map getComponentConfiguration() { - return null; - } - - @Override - public Fields getOutputFields() { - return null; - } -} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java index dd2969651ba..a7b3e09a20a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java @@ -34,7 +34,6 @@ import org.junit.Rule; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.MockitoAnnotations; import java.util.HashMap; import java.util.Map; @@ -47,7 +46,15 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + public abstract class KafkaSpoutAbstractTest { + + @Rule + public MockitoRule mockito = MockitoJUnit.rule(); + @Rule public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); @@ -74,13 +81,11 @@ protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) { @Before public void setUp() { - MockitoAnnotations.initMocks(this); - spoutConfig = createSpoutConfig(); consumerSpy = createConsumerSpy(); - spout = new KafkaSpout<>(spoutConfig, createConsumerFactory()); + spout = new KafkaSpout<>(spoutConfig, createConsumerFactory(), new TopicAssigner()); simulatedTime = new Time.SimulatedTime(); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 317723db4f3..01e2e9f7a55 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -52,8 +52,11 @@ import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; public class KafkaSpoutEmitTest { @@ -67,7 +70,7 @@ public class KafkaSpoutEmitTest { @Before public void setUp() { - spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); consumerMock = mock(KafkaConsumer.class); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java index f1acafadd0b..01ee9e4a97e 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -52,7 +52,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; public class KafkaSpoutLogCompactionSupportTest { @@ -70,7 +71,7 @@ public class KafkaSpoutLogCompactionSupportTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); consumerMock = mock(KafkaConsumer.class); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index a9e7c6c7284..920dedcd595 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -44,7 +44,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.CommitMetadataManager; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -77,7 +78,7 @@ public void setUp() { @Test public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays. - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .build(); KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); @@ -86,7 +87,7 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); - + when(consumerMock.position(partition)).thenReturn(1L); //The spout should have emitted the tuple, and must have committed it before emit @@ -121,7 +122,7 @@ private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .build(); doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); @@ -130,7 +131,7 @@ public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception @Test public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception { //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .build(); doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); @@ -164,7 +165,7 @@ private void doTestModeCannotReplayTuples(KafkaSpoutConfig spout @Test public void testAtMostOnceModeCannotReplayTuples() throws Exception { //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setTupleTrackingEnforced(true) .build(); @@ -174,7 +175,7 @@ public void testAtMostOnceModeCannotReplayTuples() throws Exception { @Test public void testNoGuaranteeModeCannotReplayTuples() throws Exception { //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .setTupleTrackingEnforced(true) .build(); @@ -184,7 +185,7 @@ public void testNoGuaranteeModeCannotReplayTuples() throws Exception { @Test public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setTupleTrackingEnforced(true) .build(); @@ -214,11 +215,11 @@ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { })); } } - + @Test public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked - KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .setTupleTrackingEnforced(true) .build(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java index 2273117e91f..5b37a8dbec4 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java @@ -38,6 +38,7 @@ import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -84,7 +85,7 @@ public void setUp() { when(consumerFactoryMock.createConsumer(any())) .thenReturn(consumerSpy) .thenReturn(postReactivationConsumerSpy); - this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock); + this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock, new TopicAssigner()); } private void prepareSpout(int messageCount) throws Exception { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 676cb3d85a8..c0c10f8d604 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -40,7 +40,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -49,7 +48,6 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.mockito.ArgumentMatchers.any; @@ -58,8 +56,23 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import java.util.HashSet; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashSet; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; +import org.junit.Rule; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + public class KafkaSpoutRebalanceTest { + @Rule + public MockitoRule mockito = MockitoJUnit.rule(); + @Captor private ArgumentCaptor> commitCapture; @@ -69,23 +82,32 @@ public class KafkaSpoutRebalanceTest { private SpoutOutputCollector collectorMock; private KafkaConsumer consumerMock; private KafkaConsumerFactory consumerFactory; + private TopicFilter topicFilterMock; + private ManualPartitioner partitionerMock; @Before public void setUp() { - MockitoAnnotations.initMocks(this); contextMock = mock(TopologyContext.class); collectorMock = mock(SpoutOutputCollector.class); consumerMock = mock(KafkaConsumer.class); consumerFactory = (kafkaSpoutConfig) -> consumerMock; + topicFilterMock = mock(TopicFilter.class); + when(topicFilterMock.getAllSubscribedPartitions(any())) + .thenReturn(new HashSet<>()); + partitionerMock = mock(ManualPartitioner.class); + when(partitionerMock.getPartitionsForThisTask(any(), any())) + .thenReturn(new HashSet<>()); } //Returns messageIds in order of emission - private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor rebalanceListenerCapture) { + private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, TopicAssigner topicAssigner) { //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); //Assign partitions to the spout + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + verify(topicAssigner).assignPartitions(any(), any(), rebalanceListenerCapture.capture()); ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); Set assignedPartitions = new HashSet<>(); assignedPartitions.add(partitionThatWillBeRevoked); @@ -123,21 +145,17 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them try (SimulatedTime simulatedTime = new SimulatedTime()) { - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - Subscription subscriptionMock = mock(Subscription.class); - doNothing() - .when(subscriptionMock) - .subscribe(any(), rebalanceListenerCapture.capture(), any()); - KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + TopicAssigner assignerMock = mock(TopicAssigner.class); + KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(), consumerFactory); + .build(), consumerFactory, assignerMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); TopicPartition assignedPartition = new TopicPartition(topic, 2); //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( - spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); + spout, partitionThatWillBeRevoked, assignedPartition, assignerMock); //Ack both emitted tuples spout.ack(emittedMessageIds.get(0)); @@ -159,16 +177,12 @@ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws E @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - Subscription subscriptionMock = mock(Subscription.class); - doNothing() - .when(subscriptionMock) - .subscribe(any(), rebalanceListenerCapture.capture(), any()); + TopicAssigner assignerMock = mock(TopicAssigner.class); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) - .build(), consumerFactory); + .build(), consumerFactory, assignerMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); TopicPartition assignedPartition = new TopicPartition(topic, 2); @@ -179,7 +193,7 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( - spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); + spout, partitionThatWillBeRevoked, assignedPartition, assignerMock); //Check that only two message ids were generated verify(retryServiceMock, times(2)).getMessageId(any(TopicPartition.class), anyLong()); @@ -200,14 +214,10 @@ public void testReassignPartitionSeeksForOnlyNewPartitions() { * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those. */ - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - Subscription subscriptionMock = mock(Subscription.class); - doNothing() - .when(subscriptionMock) - .subscribe(any(), rebalanceListenerCapture.capture(), any()); - KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + TopicAssigner assignerMock = mock(TopicAssigner.class); + KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) - .build(), consumerFactory); + .build(), consumerFactory, assignerMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition assignedPartition = new TopicPartition(topic, 1); TopicPartition newPartition = new TopicPartition(topic, 2); @@ -215,6 +225,9 @@ public void testReassignPartitionSeeksForOnlyNewPartitions() { //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); + + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + verify(assignerMock).assignPartitions(any(), any(), rebalanceListenerCapture.capture()); //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 64b69b0d3b1..0b5c580ecd7 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -18,12 +18,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,7 +38,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; -import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.mockito.ArgumentMatchers.anyList; @@ -48,10 +45,17 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; +import org.junit.Rule; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; public class KafkaSpoutRetryLimitTest { + @Rule + public MockitoRule mockito = MockitoJUnit.rule(); + private final long offsetCommitPeriodMs = 2_000; private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); @@ -69,8 +73,7 @@ public class KafkaSpoutRetryLimitTest { @Before public void setUp() { - MockitoAnnotations.initMocks(this); - spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .setRetry(ZERO_RETRIES_RETRY_SERVICE) .build(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index d92a3a74d47..6b8b94bb4cb 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -82,7 +82,6 @@ public void setUp() { //This is to verify that a low maxPollRecords does not interfere with reemitting failed tuples //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets. assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets)); - MockitoAnnotations.initMocks(this); spout = new KafkaSpout<>(spoutConfig); new KafkaConsumerFactoryDefault().createConsumer(spoutConfig); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index 05cfd285f94..2aed18262ef 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -41,7 +42,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.mockito.ArgumentCaptor; @@ -49,8 +52,8 @@ public class SpoutWithMockedConsumerSetupHelper { /** - * Creates, opens and activates a KafkaSpout using a mocked consumer. The subscription should be a mock object, since this method skips - * the subscription and instead just configures the mocked consumer to act as if the specified partitions are assigned to it. + * Creates, opens and activates a KafkaSpout using a mocked consumer. The TopicFilter and ManualPartitioner should be mock objects, + * since this method shortcircuits the TopicPartition assignment process and just calls onPartitionsAssigned on the rebalance listener. * * @param The Kafka key type * @param The Kafka value type @@ -64,22 +67,24 @@ public class SpoutWithMockedConsumerSetupHelper { */ public static KafkaSpout setupSpout(KafkaSpoutConfig spoutConfig, Map topoConf, TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer consumerMock, TopicPartition... assignedPartitions) { - Subscription subscriptionMock = spoutConfig.getSubscription(); - if (!mockingDetails(subscriptionMock).isMock()) { - throw new IllegalStateException("Use a mocked subscription when using this method, it helps avoid complex stubbing"); + TopicFilter topicFilter = spoutConfig.getTopicFilter(); + ManualPartitioner topicPartitioner = spoutConfig.getTopicPartitioner(); + if (!mockingDetails(topicFilter).isMock() || !mockingDetails(topicPartitioner).isMock()) { + throw new IllegalStateException("Use a mocked TopicFilter and a mocked ManualPartitioner when using this method, it helps avoid complex stubbing"); } Set assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions)); - when(consumerMock.assignment()).thenReturn(assignedPartitionsSet); + TopicAssigner assigner = mock(TopicAssigner.class); doAnswer(invocation -> { - ConsumerRebalanceListener listener = invocation.getArgument(1); + ConsumerRebalanceListener listener = invocation.getArgument(2); listener.onPartitionsAssigned(assignedPartitionsSet); return null; - }).when(subscriptionMock).subscribe(any(), any(ConsumerRebalanceListener.class), any()); + }).when(assigner).assignPartitions(any(), any(), any()); + when(consumerMock.assignment()).thenReturn(assignedPartitionsSet); KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; - KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory); + KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory, assigner); spout.open(topoConf, contextMock, collectorMock); spout.activate(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java index 3670d8acd81..4896267f86d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java @@ -24,7 +24,8 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; @@ -37,8 +38,8 @@ public static KafkaSpoutConfig.Builder createKafkaSpoutConfigBui return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); } - public static KafkaSpoutConfig.Builder createKafkaSpoutConfigBuilder(Subscription subscription, int port) { - return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription)); + public static KafkaSpoutConfig.Builder createKafkaSpoutConfigBuilder(TopicFilter topicFilter, ManualPartitioner topicPartitioner, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, topicFilter, topicPartitioner)); } public static KafkaSpoutConfig.Builder setCommonSpoutConfig(KafkaSpoutConfig.Builder config) { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java index 3985619e6ed..a30a23a774f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -54,7 +55,7 @@ public void testFilter() { when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); - List matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + Set matchedPartitions = filter.getAllSubscribedPartitions(consumerMock); assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java index 67411e3e5d7..01e0b3d0ac7 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -61,7 +62,7 @@ public void testFilter() { when(consumerMock.listTopics()).thenReturn(allTopics); - List matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + Set matchedPartitions = filter.getAllSubscribedPartitions(consumerMock); assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java new file mode 100644 index 00000000000..f4deeba405b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; + +public class RoundRobinManualPartitionerTest { + + private TopicPartition createTp(int partition) { + return new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, partition); + } + + private Set partitionsToTps(int[] expectedPartitions) { + Set expectedTopicPartitions = new HashSet<>(); + for(int i = 0; i < expectedPartitions.length; i++) { + expectedTopicPartitions.add(createTp(expectedPartitions[i])); + } + return expectedTopicPartitions; + } + + @Test + public void testRoundRobinPartitioning() { + List allPartitions = new ArrayList<>(); + for(int i = 0; i < 11; i++) { + allPartitions.add(createTp(i)); + } + List contextMocks = new ArrayList<>(); + String thisComponentId = "A spout"; + List allTasks = Arrays.asList(new Integer[]{0, 1, 2}); + for(int i = 0; i < 3; i++) { + TopologyContext contextMock = mock(TopologyContext.class); + when(contextMock.getThisTaskIndex()).thenReturn(i); + when(contextMock.getThisComponentId()).thenReturn(thisComponentId); + when(contextMock.getComponentTasks(thisComponentId)).thenReturn(allTasks); + contextMocks.add(contextMock); + } + RoundRobinManualPartitioner partitioner = new RoundRobinManualPartitioner(); + + Set partitionsForFirstTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(0)); + assertThat(partitionsForFirstTask, is(partitionsToTps(new int[]{0, 3, 6, 9}))); + + Set partitionsForSecondTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(1)); + assertThat(partitionsForSecondTask, is(partitionsToTps(new int[]{1, 4, 7, 10}))); + + Set partitionsForThirdTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(2)); + assertThat(partitionsForThirdTask, is(partitionsToTps(new int[]{2, 5, 8}))); + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java similarity index 69% rename from external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java rename to external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java index 2355283036d..96bbc1cfa93 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java @@ -16,49 +16,38 @@ package org.apache.storm.kafka.spout.subscription; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.task.TopologyContext; import org.junit.Test; import org.mockito.InOrder; -public class ManualPartitionSubscriptionTest { +public class TopicAssignerTest { @Test - public void testCanReassignPartitions() { - ManualPartitioner partitionerMock = mock(ManualPartitioner.class); - TopicFilter filterMock = mock(TopicFilter.class); - KafkaConsumer consumerMock = mock(KafkaConsumer.class); - ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class); - TopologyContext contextMock = mock(TopologyContext.class); - ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock); - - List onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); - List twoPartitions = new ArrayList<>(); + public void testCanReassignPartitions() { + Set onePartition = Collections.singleton(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + Set twoPartitions = new HashSet<>(); twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1)); - when(partitionerMock.partition(anyList(), any(TopologyContext.class))) - .thenReturn(onePartition) - .thenReturn(twoPartitions); + KafkaConsumer consumerMock = mock(KafkaConsumer.class); + ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class); + TopicAssigner assigner = new TopicAssigner(); //Set the first assignment - subscription.subscribe(consumerMock, listenerMock, contextMock); + assigner.assignPartitions(consumerMock, onePartition, listenerMock); InOrder inOrder = inOrder(consumerMock, listenerMock); + inOrder.verify(listenerMock).onPartitionsRevoked(Collections.emptySet()); inOrder.verify(consumerMock).assign(new HashSet<>(onePartition)); inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition)); @@ -67,7 +56,7 @@ public void testCanReassignPartitions() { when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition)); //Update to set the second assignment - subscription.refreshAssignment(); + assigner.assignPartitions(consumerMock, twoPartitions, listenerMock); //The partition revocation hook must be called before the new partitions are assigned to the consumer, //to allow the revocation hook to commit offsets for the revoked partitions. diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java index a5c78a8572f..a15e4158a2a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java @@ -19,9 +19,9 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -import java.util.Collections; +import java.util.List; import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper; import org.json.simple.JSONValue; @@ -38,27 +38,24 @@ public void testMetadataIsRoundTripSerializableWithJsonSimple() throws Exception * It is important that all map entries are types json-simple knows about, * since otherwise the library just calls toString on them which will likely produce invalid JSON. */ - TopicPartition tp = new TopicPartition("topic", 0); long startOffset = 10; long endOffset = 20; - KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset); + KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(startOffset, endOffset); Map map = metadata.toMap(); Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map)); KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap); - assertThat(deserializedMetadata.getTopicPartition(), is(metadata.getTopicPartition())); assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset())); assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset())); } @Test public void testCreateMetadataFromRecords() { - TopicPartition tp = new TopicPartition("topic", 0); long firstOffset = 15; long lastOffset = 55; - ConsumerRecords records = new ConsumerRecords<>(Collections.singletonMap(tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) (lastOffset - firstOffset + 1)))); + List> records = SpoutWithMockedConsumerSetupHelper.createRecords(new TopicPartition("test", 0), firstOffset, (int) (lastOffset - firstOffset + 1)); - KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, records); + KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records); assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset)); assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset)); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java new file mode 100644 index 00000000000..6208ce4b21d --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java @@ -0,0 +1,214 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.topology.TransactionAttempt; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class KafkaTridentSpoutEmitterTest { + + @Rule + public MockitoRule mockito = MockitoJUnit.rule(); + + @Captor + public ArgumentCaptor> assignmentCaptor; + + private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); + + @Test + public void testGetOrderedPartitionsIsConsistent() { + KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( + SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) + .build(), + mock(TopologyContext.class), + mock(KafkaConsumerFactory.class), new TopicAssigner()); + + Set allPartitions = new HashSet<>(); + int numPartitions = 10; + for (int i = 0; i < numPartitions; i++) { + allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i)); + } + List> serializedPartitions = allPartitions.stream() + .map(tp -> tpSerializer.toMap(tp)) + .collect(Collectors.toList()); + + List orderedPartitions = emitter.getOrderedPartitions(serializedPartitions); + assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size())); + Collections.shuffle(serializedPartitions); + List secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions); + assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions)); + + serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions))); + List orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions); + orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1); + assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions)); + } + + @Test + public void testGetPartitionsForTask() { + //Verify correct wrapping/unwrapping of partition and delegation of partition assignment + ManualPartitioner partitionerMock = mock(ManualPartitioner.class); + when(partitionerMock.getPartitionsForThisTask(any(), any())) + .thenAnswer(invocation -> { + List partitions = new ArrayList<>((List) invocation.getArguments()[0]); + partitions.remove(0); + return new HashSet<>(partitions); + }); + + KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( + SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1) + .build(), + mock(TopologyContext.class), + mock(KafkaConsumerFactory.class), new TopicAssigner()); + + List allPartitions = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i)); + } + List unwrappedPartitions = allPartitions.stream() + .map(kttp -> kttp.getTopicPartition()) + .collect(Collectors.toList()); + + List partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions); + verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class)); + allPartitions.remove(0); + assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions))); + } + + @Test + public void testAssignPartitions() { + //Verify correct unwrapping of partitions and delegation of assignment + KafkaConsumer consumerMock = mock(KafkaConsumer.class); + KafkaConsumerFactory consumerFactory = spoutConfig -> consumerMock; + TopicAssigner assignerMock = mock(TopicAssigner.class); + + KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( + SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) + .build(), + mock(TopologyContext.class), + consumerFactory, assignerMock); + + List allPartitions = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i)); + } + Set unwrappedPartitions = allPartitions.stream() + .map(kttp -> kttp.getTopicPartition()) + .collect(Collectors.toSet()); + + emitter.refreshPartitions(allPartitions); + + verify(assignerMock).assignPartitions(any(KafkaConsumer.class), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class)); + } + + private Map doEmitBatchTest(KafkaConsumer consumerMock, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map previousBatchMeta) { + when(consumerMock.assignment()).thenReturn(Collections.singleton(tp)); + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap( + tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords)))); + KafkaConsumerFactory consumerFactory = spoutConfig -> consumerMock; + + KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( + SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) + .build(), + mock(TopologyContext.class), + consumerFactory, new TopicAssigner()); + + TransactionAttempt txid = new TransactionAttempt(10L, 0); + KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp); + return emitter.emitPartitionBatch(txid, collectorMock, kttp, previousBatchMeta); + } + + @Test + public void testEmitBatchWithNullMeta() { + //Check that null meta makes the spout seek according to FirstPollOffsetStrategy, and that the returned meta is correct + KafkaConsumer consumerMock = mock(KafkaConsumer.class); + TridentCollector collectorMock = mock(TridentCollector.class); + TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + long firstOffset = 0; + int numRecords = 10; + Map batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, null); + + InOrder inOrder = inOrder(consumerMock, collectorMock); + inOrder.verify(consumerMock).seekToBeginning(Collections.singleton(tp)); + inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(collectorMock, times(numRecords)).emit(anyList()); + KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta); + assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset)); + assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1)); + } + + @Test + public void testEmitBatchWithPreviousMeta() { + //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct + KafkaConsumer consumerMock = mock(KafkaConsumer.class); + TridentCollector collectorMock = mock(TridentCollector.class); + TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + long firstOffset = 50; + int numRecords = 10; + KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1); + Map batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap()); + + InOrder inOrder = inOrder(consumerMock, collectorMock); + inOrder.verify(consumerMock).seek(tp, firstOffset); + inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(collectorMock, times(numRecords)).emit(anyList()); + KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta); + assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset)); + assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1)); + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java new file mode 100644 index 00000000000..1abe551b5f1 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; + +public class KafkaTridentSpoutOpaqueCoordinatorTest { + + private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); + + @Test + public void testCanGetPartitions() { + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + TopicPartition expectedPartition = new TopicPartition("test", 0); + TopicFilter mockFilter = mock(TopicFilter.class); + when(mockFilter.getAllSubscribedPartitions(any())).thenReturn(Collections.singleton(expectedPartition)); + + KafkaSpoutConfig spoutConfig = + SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1) + .build(); + KafkaTridentSpoutOpaqueCoordinator coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer); + + List> partitionsForBatch = coordinator.getPartitionsForBatch(); + + List tps = deserializePartitions(partitionsForBatch); + + verify(mockFilter).getAllSubscribedPartitions(mockConsumer); + assertThat(tps, contains(expectedPartition)); + + } + + @Test + public void testCanUpdatePartitions() { + try (SimulatedTime time = new SimulatedTime()) { + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + TopicPartition expectedPartition = new TopicPartition("test", 0); + TopicPartition addedLaterPartition = new TopicPartition("test-2", 0); + HashSet allPartitions = new HashSet<>(); + allPartitions.add(expectedPartition); + allPartitions.add(addedLaterPartition); + TopicFilter mockFilter = mock(TopicFilter.class); + when(mockFilter.getAllSubscribedPartitions(any())) + .thenReturn(Collections.singleton(expectedPartition)) + .thenReturn(allPartitions); + + KafkaSpoutConfig spoutConfig = + SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1) + .build(); + KafkaTridentSpoutOpaqueCoordinator coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer); + + List> partitionsForBatch = coordinator.getPartitionsForBatch(); + + List firstBatchTps = deserializePartitions(partitionsForBatch); + + verify(mockFilter).getAllSubscribedPartitions(mockConsumer); + assertThat(firstBatchTps, contains(expectedPartition)); + + Time.advanceTime(KafkaTridentSpoutOpaqueCoordinator.TIMER_DELAY_MS + spoutConfig.getPartitionRefreshPeriodMs()); + + List> partitionsForSecondBatch = coordinator.getPartitionsForBatch(); + + List secondBatchTps = deserializePartitions(partitionsForSecondBatch); + verify(mockFilter, times(2)).getAllSubscribedPartitions(mockConsumer); + assertThat(new HashSet<>(secondBatchTps), is(allPartitions)); + + } + } + + private List deserializePartitions(List> tps) { + return tps.stream() + .map(map -> tpSerializer.fromMap(map)) + .collect(Collectors.toList()); + } + +} diff --git a/pom.xml b/pom.xml index 2260d6644dc..3886a821abc 100644 --- a/pom.xml +++ b/pom.xml @@ -896,12 +896,12 @@ guava ${guava.version} - - com.google.auto.service - auto-service - ${auto-service.version} - true - + + com.google.auto.service + auto-service + ${auto-service.version} + true + org.apache.logging.log4j log4j-api diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java index 5fe3c6578e0..8dd13016b36 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java +++ b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java @@ -72,23 +72,30 @@ interface Emitter { /** * This method is called when this task is responsible for a new set of partitions. Should be used * to manage things like connections to brokers. + * @param partitionResponsibilities The partitions assigned to this task */ void refreshPartitions(List partitionResponsibilities); /** - * @return The oredered list of partitions being processed by all the tasks + * Sorts the partition info to produce an ordered list of partition. + * @param allPartitionInfo The partition info for all partitions being processed by all spout tasks + * @return The ordered list of partitions being processed by all the tasks. The ordering must be consistent for all tasks. */ List getOrderedPartitions(Partitions allPartitionInfo); /** + * Get the partitions assigned to this task. + * @param taskId The id of this task + * @param numTasks The number of tasks for this spout + * @param allPartitionInfoSorted The partition info for all partitions being processed by all spout tasks, + * sorted according to {@link #getOrderedPartitions(java.lang.Object)} * @return The list of partitions that are to be processed by the task with id {@code taskId} */ - default List getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo){ - final List orderedPartitions = getOrderedPartitions(allPartitionInfo); - final List taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size()); - if (orderedPartitions != null) { - for (int i = taskId; i < orderedPartitions.size(); i += numTasks) { - taskPartitions.add(orderedPartitions.get(i)); + default List getPartitionsForTask(int taskId, int numTasks, List allPartitionInfoSorted){ + final List taskPartitions = new ArrayList<>(allPartitionInfoSorted == null ? 0 : allPartitionInfoSorted.size()); + if (allPartitionInfoSorted != null) { + for (int i = taskId; i < allPartitionInfoSorted.size(); i += numTasks) { + taskPartitions.add(allPartitionInfoSorted.get(i)); } } return taskPartitions; diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java index e7bf70a5ff1..f381318257a 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java @@ -111,13 +111,13 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { _partitionStates.clear(); - final List taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta); + final List sortedPartitions = _emitter.getOrderedPartitions(coordinatorMeta); + final List taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, sortedPartitions); for (ISpoutPartition partition : taskPartitions) { _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition)); } + _emitter.refreshPartitions(taskPartitions); - // refresh all partitions for backwards compatibility with old spout - _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta)); _savedCoordinatorMeta = coordinatorMeta; _changedMeta = true; } @@ -137,7 +137,9 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl EmitterPartitionState s = e.getValue(); s.rotatingState.removeState(tx.getTransactionId()); Object lastMeta = prevCached.get(id); - if(lastMeta==null) lastMeta = s.rotatingState.getLastState(); + if(lastMeta==null) { + lastMeta = s.rotatingState.getLastState(); + } Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta); metas.put(id, meta); }