diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index ada861992a0..99b9ae500c1 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -240,12 +240,9 @@ streams. If you are doing this for Trident a value must be in the List returned otherwise trident can throw exceptions. -### Manual Partition Control (ADVANCED) +### Manual Partition Assigment (ADVANCED) -By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions. -This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing -Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again -please be careful when using these or implementing your own. +By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality. ## Use the Maven Shade Plugin to Build the Uber Jar diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 6f09f5f67ab..72fa52ef1a4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -24,39 +24,41 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription; +import org.apache.storm.kafka.spout.subscription.NamedTopicFilter; +import org.apache.storm.kafka.spout.subscription.PatternTopicFilter; +import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner; +import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.tuple.Fields; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics. */ public class KafkaSpoutConfig implements Serializable { + private static final long serialVersionUID = 141902646130682494L; // 200ms - public static final long DEFAULT_POLL_TIMEOUT_MS = 200; + public static final long DEFAULT_POLL_TIMEOUT_MS = 200; // 30s - public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // Retry forever - public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // 10,000,000 records => 80MBs of memory footprint in the worst case - public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 2s - public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; + public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; - public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); - /** - * Retry in a tight loop (keep unit tests fasts) do not use in production. - */ - public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0), - DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0)); - + public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + // Kafka consumer configuration private final Map kafkaProps; private final Subscription subscription; @@ -73,9 +75,10 @@ public class KafkaSpoutConfig implements Serializable { /** * Creates a new KafkaSpoutConfig using a Builder. + * * @param builder The Builder to construct the KafkaSpoutConfig from */ - public KafkaSpoutConfig(Builder builder) { + public KafkaSpoutConfig(Builder builder) { this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); this.subscription = builder.subscription; this.translator = builder.translator; @@ -108,12 +111,13 @@ public static enum FirstPollOffsetStrategy { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, - UNCOMMITTED_LATEST + UNCOMMITTED_LATEST } - - public static class Builder { + + public static class Builder { + private final Map kafkaProps; - private Subscription subscription; + private final Subscription subscription; private RecordTranslator translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; @@ -123,20 +127,22 @@ public static class Builder { private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; - public Builder(String bootstrapServers, String ... topics) { - this(bootstrapServers, new NamedSubscription(topics)); + public Builder(String bootstrapServers, String... topics) { + this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } - - public Builder(String bootstrapServers, Collection topics) { - this(bootstrapServers, new NamedSubscription(topics)); + + public Builder(String bootstrapServers, Set topics) { + this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), + new NamedTopicFilter(topics))); } - + public Builder(String bootstrapServers, Pattern topics) { - this(bootstrapServers, new PatternSubscription(topics)); + this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } - + /** * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers. + * * @param bootstrapServers The bootstrap servers the consumer will use * @param subscription The subscription defining which topics and partitions each spout instance will read. */ @@ -149,30 +155,30 @@ public Builder(String bootstrapServers, Subscription subscription) { this.subscription = subscription; this.translator = new DefaultRecordTranslator<>(); } - + /** - * Set a {@link KafkaConsumer} property. + * Set a {@link KafkaConsumer} property. */ - public Builder setProp(String key, Object value) { + public Builder setProp(String key, Object value) { kafkaProps.put(key, value); return this; } - + /** * Set multiple {@link KafkaConsumer} properties. */ - public Builder setProp(Map props) { + public Builder setProp(Map props) { kafkaProps.putAll(props); return this; } - + /** * Set multiple {@link KafkaConsumer} properties. */ - public Builder setProp(Properties props) { + public Builder setProp(Properties props) { props.forEach((key, value) -> { if (key instanceof String) { - kafkaProps.put((String)key, value); + kafkaProps.put((String) key, value); } else { throw new IllegalArgumentException("Kafka Consumer property keys must be Strings"); } @@ -183,46 +189,51 @@ public Builder setProp(Properties props) { //Spout Settings /** * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s. + * * @param pollTimeoutMs time in ms */ - public Builder setPollTimeoutMs(long pollTimeoutMs) { + public Builder setPollTimeoutMs(long pollTimeoutMs) { this.pollTimeoutMs = pollTimeoutMs; return this; } /** * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. + * * @param offsetCommitPeriodMs time in ms */ - public Builder setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { + public Builder setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { this.offsetCommitPeriodMs = offsetCommitPeriodMs; return this; } /** - * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. - * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number - * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. - * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1. + * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this + * limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets + * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded, + * but no partition will exceed this limit by more than maxPollRecords - 1. + * * @param maxUncommittedOffsets max number of records that can be be pending commit */ - public Builder setMaxUncommittedOffsets(int maxUncommittedOffsets) { + public Builder setMaxUncommittedOffsets(int maxUncommittedOffsets) { this.maxUncommittedOffsets = maxUncommittedOffsets; return this; } /** - * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. - * Please refer to to the documentation in {@link FirstPollOffsetStrategy} + * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the + * documentation in {@link FirstPollOffsetStrategy} + * * @param firstPollOffsetStrategy Offset used by Kafka spout first poll - * */ + */ public Builder setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { this.firstPollOffsetStrategy = firstPollOffsetStrategy; return this; } - + /** * Sets the retry service for the spout to use. + * * @param retryService the new retry service * @return the builder (this). */ @@ -238,9 +249,10 @@ public Builder setRecordTranslator(RecordTranslator translator) { this.translator = translator; return this; } - + /** * Configure a translator with tuples to be emitted on the default stream. + * * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted * @param fields the names of the fields extracted * @return this to be able to chain configuration @@ -248,9 +260,10 @@ public Builder setRecordTranslator(RecordTranslator translator) { public Builder setRecordTranslator(Func, List> func, Fields fields) { return setRecordTranslator(new SimpleRecordTranslator<>(func, fields)); } - + /** * Configure a translator with tuples to be emitted to a given stream. + * * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted * @param fields the names of the fields extracted * @param stream the stream to emit the tuples on @@ -259,12 +272,12 @@ public Builder setRecordTranslator(Func, List public Builder setRecordTranslator(Func, List> func, Fields fields, String stream) { return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream)); } - + /** - * Sets partition refresh period in milliseconds. This is how often kafka will be polled - * to check for new topics and/or new partitions. - * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and + * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new + * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and * PatternSubscription rely on kafka to handle this instead. + * * @param partitionRefreshPeriodMs time in milliseconds * @return the builder (this) */ @@ -274,8 +287,9 @@ public Builder setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) } /** - * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly - * ack them. By default this parameter is set to false, which means that null tuples are not emitted. + * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default + * this parameter is set to false, which means that null tuples are not emitted. + * * @param emitNullTuples sets if null tuples should or not be emitted downstream */ public Builder setEmitNullTuples(boolean emitNullTuples) { @@ -283,34 +297,36 @@ public Builder setEmitNullTuples(boolean emitNullTuples) { return this; } - public KafkaSpoutConfig build() { + public KafkaSpoutConfig build() { return new KafkaSpoutConfig<>(this); } } - - + /** * Factory method that creates a Builder with String key/value deserializers. + * * @param bootstrapServers The bootstrap servers for the consumer * @param topics The topics to subscribe to * @return The new builder */ - public static Builder builder(String bootstrapServers, String ... topics) { + public static Builder builder(String bootstrapServers, String... topics) { return setStringDeserializers(new Builder<>(bootstrapServers, topics)); } - + /** * Factory method that creates a Builder with String key/value deserializers. + * * @param bootstrapServers The bootstrap servers for the consumer * @param topics The topics to subscribe to * @return The new builder */ - public static Builder builder(String bootstrapServers, Collection topics) { + public static Builder builder(String bootstrapServers, Set topics) { return setStringDeserializers(new Builder<>(bootstrapServers, topics)); } - + /** * Factory method that creates a Builder with String key/value deserializers. + * * @param bootstrapServers The bootstrap servers for the consumer * @param topics The topic pattern to subscribe to * @return The new builder @@ -318,13 +334,13 @@ public static Builder builder(String bootstrapServers, Collectio public static Builder builder(String bootstrapServers, Pattern topics) { return setStringDeserializers(new Builder<>(bootstrapServers, topics)); } - + private static Builder setStringDeserializers(Builder builder) { builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return builder; } - + private static Map setDefaultsAndGetKafkaProps(Map kafkaProps) { // set defaults for properties not specified if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { @@ -335,17 +351,18 @@ private static Map setDefaultsAndGetKafkaProps(Map getKafkaProps() { return kafkaProps; } - + public Subscription getSubscription() { return subscription; } - - public RecordTranslator getTranslator() { + + public RecordTranslator getTranslator() { return translator; } @@ -358,8 +375,8 @@ public long getOffsetsCommitPeriodMs() { } public boolean isConsumerAutoCommitMode() { - return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false - || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false + || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } public String getConsumerGroupId() { @@ -377,7 +394,7 @@ public int getMaxUncommittedOffsets() { public KafkaSpoutRetryService getRetryService() { return retryService; } - + public long getPartitionRefreshPeriodMs() { return partitionRefreshPeriodMs; } @@ -389,14 +406,14 @@ public boolean isEmitNullTuples() { @Override public String toString() { return "KafkaSpoutConfig{" - + "kafkaProps=" + kafkaProps - + ", pollTimeoutMs=" + pollTimeoutMs - + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs - + ", maxUncommittedOffsets=" + maxUncommittedOffsets - + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy - + ", subscription=" + subscription - + ", translator=" + translator - + ", retryService=" + retryService - + '}'; + + "kafkaProps=" + kafkaProps + + ", pollTimeoutMs=" + pollTimeoutMs + + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + + ", maxUncommittedOffsets=" + maxUncommittedOffsets + + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + + ", subscription=" + subscription + + ", translator=" + translator + + ", retryService=" + retryService + + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java deleted file mode 100644 index 0eb48cb351e..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.storm.task.TopologyContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Subscribe to all topics that follow a given list of values. - */ -public class NamedSubscription extends Subscription { - private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class); - private static final long serialVersionUID = 3438543305215813839L; - protected final Collection topics; - - public NamedSubscription(Collection topics) { - this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics)); - } - - public NamedSubscription(String ... topics) { - this(Arrays.asList(topics)); - } - - @Override - public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext unused) { - consumer.subscribe(topics, listener); - LOG.info("Kafka consumer subscribed topics {}", topics); - - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - consumer.poll(0); - } - - @Override - public String getTopicsString() { - return String.join(",", topics); - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java deleted file mode 100644 index ec53f01ba29..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.storm.task.TopologyContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Subscribe to all topics that match a given pattern. - */ -public class PatternSubscription extends Subscription { - private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class); - private static final long serialVersionUID = 3438543305215813839L; - protected final Pattern pattern; - - public PatternSubscription(Pattern pattern) { - this.pattern = pattern; - } - - @Override - public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext unused) { - consumer.subscribe(pattern, listener); - LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); - - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - consumer.poll(0); - } - - @Override - public String getTopicsString() { - return pattern.pattern(); - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java similarity index 96% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java index 2c65d6d8626..17512ea4749 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.util.Collections; import java.util.HashSet; @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.TopicPartitionComparator; import org.apache.storm.task.TopologyContext; public class ManualPartitionSubscription extends Subscription { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java similarity index 97% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java index 4856687ee26..dce7fc69487 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.util.List; import org.apache.kafka.common.TopicPartition; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java similarity index 97% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java index 982828d51e6..d6e5fc2f07f 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.util.ArrayList; import java.util.Arrays; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java similarity index 97% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java index 29648748736..98f8b23b387 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.util.ArrayList; import java.util.HashSet; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java similarity index 97% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java index 4afcc49efe9..9660c77ddbe 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.util.ArrayList; import java.util.HashSet; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java similarity index 97% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java index 9c5a8c4a388..8091bfa6fe5 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.io.Serializable; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java similarity index 96% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java index 7631c8a7d45..497e3ca2b8c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import java.io.Serializable; import java.util.List; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index 8dc34d46086..7258fe25d9d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -26,16 +26,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -50,53 +49,38 @@ public class KafkaSpoutCommitTest { private final Map conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer consumerMock; - private KafkaSpout spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig spoutConfig; @Captor private ArgumentCaptor> commitCapture; - private void setupSpout(Set assignedPartitions) { + @Before + public void setUp() { MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testCommitSuccessWithOffsetVoids() { //Verify that the commit logic can handle offset voids try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); // Offsets emitted are 0,1,2,3,4,,8,9 for (int i = 0; i < 5; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } for (int i = 8; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 24a2eda5940..8e6d39063a4 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -16,7 +16,6 @@ package org.apache.storm.kafka.spout; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.inOrder; @@ -32,18 +31,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; @@ -56,45 +53,30 @@ public class KafkaSpoutEmitTest { private final Map conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer consumerMock; - private KafkaSpout spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig spoutConfig; - private void setupSpout(Set assignedPartitions) { + @Before + public void setUp() { spoutConfig = getKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); - consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); for (int i = 0; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); spout.nextTuple(); @@ -107,17 +89,17 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); @@ -172,13 +154,13 @@ public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenR //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> firstPollRecords = new HashMap<>(); List> firstPollRecordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } firstPollRecords.put(partition, firstPollRecordsForPartition); @@ -186,13 +168,13 @@ public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenR Map>> secondPollRecords = new HashMap<>(); List> secondPollRecordsForPartition = new ArrayList<>(); for(int i = 0; i < maxPollRecords; i++) { - secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); + secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); } secondPollRecords.put(partition, secondPollRecordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPollRecords)) - .thenReturn(new ConsumerRecords(secondPollRecords)); + .thenReturn(new ConsumerRecords<>(firstPollRecords)) + .thenReturn(new ConsumerRecords<>(secondPollRecords)); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) { spout.nextTuple(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 2d55520d643..23630a6465a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -19,9 +19,10 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -42,6 +43,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -74,14 +76,11 @@ public void setUp() { } //Returns messageIds in order of emission - private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) { - //Setup spout with mock consumer so we can get at the rebalance listener + private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor rebalanceListenerCapture) { + //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); List assignedPartitions = new ArrayList<>(); @@ -95,9 +94,9 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti Map>> secondPartitionRecords = new HashMap<>(); secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords<>(firstPartitionRecords)) + .thenReturn(new ConsumerRecords<>(secondPartitionRecords)) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); //Emit the messages spout.nextTuple(); @@ -122,7 +121,12 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(), rebalanceListenerCapture.capture(), any()); + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(), consumerFactory); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; @@ -130,7 +134,8 @@ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws E TopicPartition assignedPartition = new TopicPartition(topic, 2); //Emit a message on each partition and revoke the first partition - List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Ack both emitted tuples spout.ack(emittedMessageIds.get(0)); @@ -152,8 +157,13 @@ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws E @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(), rebalanceListenerCapture.capture(), any()); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) .build(), consumerFactory); @@ -166,7 +176,8 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); //Emit a message on each partition and revoke the first partition - List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Check that only two message ids were generated verify(retryServiceMock, times(2)).getMessageId(anyObject()); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index d84f4dafe69..078f7a17c8b 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -30,80 +30,71 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; public class KafkaSpoutRetryLimitTest { - + private final long offsetCommitPeriodMs = 2_000; private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer consumerMock; - private KafkaSpout spout; - private KafkaSpoutConfig spoutConfig; - + private KafkaSpoutConfig spoutConfig; + public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), - 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - - private void setupSpoutWithNoRetry(Set assignedPartitions) { + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + @Captor + private ArgumentCaptor> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .setRetry(ZERO_RETRIES_RETRY_SERVICE) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setRetry(ZERO_RETRIES_RETRY_SERVICE) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } - + @Test public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpoutWithNoRetry(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); int lastOffset = 3; for (int i = 0; i <= lastOffset; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); - + when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); - + .thenReturn(new ConsumerRecords<>(records)); + for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); } - + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture()); - + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); } @@ -111,16 +102,15 @@ public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { // Advance time and then trigger call to kafka consumer commit Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); spout.nextTuple(); - - ArgumentCaptor committedOffsets=ArgumentCaptor.forClass(Map.class); + InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(committedOffsets.capture()); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); //verify that Offset 3 was committed for the given TopicPartition - assertTrue(committedOffsets.getValue().containsKey(partition)); - assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset()); + assertTrue(commitCapture.getValue().containsKey(partition)); + assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); } } - -} \ No newline at end of file + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 9ebdcf7fefb..261c6544dac 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -22,12 +22,15 @@ import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,7 +61,7 @@ public class MaxUncommittedOffsetTest { private final int maxUncommittedOffsets = 10; private final int maxPollRecords = 5; private final int initialRetryDelaySecs = 60; - private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .setMaxUncommittedOffsets(maxUncommittedOffsets) @@ -93,6 +96,8 @@ private void populateTopicData(String topicName, int msgCount) throws Exception private void initializeSpout(int msgCount) throws Exception { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 7f0973b6d2f..6b92de80ba5 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -20,6 +20,7 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -28,7 +29,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -77,12 +80,12 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) .build(); - this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy; this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); } @@ -100,6 +103,8 @@ void populateTopicData(String topicName, int msgCount) throws InterruptedExcepti private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java new file mode 100644 index 00000000000..5f931bb667b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -0,0 +1,74 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; + +public class SpoutWithMockedConsumerSetupHelper { + + /** + * Creates, opens and activates a KafkaSpout using a mocked consumer. + * @param The Kafka key type + * @param The Kafka value type + * @param spoutConfig The spout config to use + * @param topoConf The topo conf to pass to the spout + * @param contextMock The topo context to pass to the spout + * @param collectorMock The mocked collector to pass to the spout + * @param consumerMock The mocked consumer + * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it. + * @return The spout + */ + public static KafkaSpout setupSpout(KafkaSpoutConfig spoutConfig, Map topoConf, + TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer consumerMock, Set assignedPartitions) { + + Map> partitionInfos = assignedPartitions.stream() + .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null)) + .collect(Collectors.groupingBy(info -> info.topic())); + partitionInfos.keySet() + .forEach(key -> when(consumerMock.partitionsFor(key)) + .thenReturn(partitionInfos.get(key))); + KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + + KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory); + + when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); + when(contextMock.getThisTaskIndex()).thenReturn(0); + + spout.open(topoConf, contextMock, collectorMock); + spout.activate(); + + verify(consumerMock).assign(assignedPartitions); + + return spout; + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java index 62dbfe5122c..d2f38b03153 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.storm.kafka.spout.builders; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -24,16 +25,26 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class SingleTopicKafkaSpoutConfiguration { + public static final String STREAM = "test_stream"; public static final String TOPIC = "test"; + /** + * Retry in a tight loop (keep unit tests fasts). + */ + public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + public static Config getConfig() { Config config = new Config(); config.setDebug(true); @@ -47,20 +58,27 @@ public static StormTopology getTopologyKafkaSpout(int port) { return tp.createTopology(); } - public static KafkaSpoutConfig.Builder getKafkaSpoutConfigBuilder(int port) { - return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC) - .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), - new Fields("topic", "key", "value"), STREAM) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) - .setRetry(getRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .setPollTimeoutMs(1000); + public static KafkaSpoutConfig.Builder getKafkaSpoutConfigBuilder(int port) { + return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); + } + + public static KafkaSpoutConfig.Builder getKafkaSpoutConfigBuilder(Subscription subscription, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription)); } - + + private static KafkaSpoutConfig.Builder setCommonSpoutConfig(KafkaSpoutConfig.Builder config) { + return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), + new Fields("topic", "key", "value"), STREAM) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) + .setRetry(getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .setPollTimeoutMs(1000); + } + protected static KafkaSpoutRetryService getRetryService() { - return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE; + return UNIT_TEST_RETRY_SERVICE; } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java similarity index 98% rename from external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java rename to external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java index e97c7e1f2a7..3985619e6ed 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java @@ -14,12 +14,11 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; - import static org.mockito.Mockito.when; import java.util.ArrayList; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java similarity index 98% rename from external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java rename to external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java index 877efdcc9ad..67411e3e5d7 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.storm.kafka.spout; +package org.apache.storm.kafka.spout.subscription; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat;