diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index 9166cb9117f..93d622ea25c 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -247,12 +247,9 @@ streams. If you are doing this for Trident a value must be in the List returned otherwise trident can throw exceptions. -### Manual Partition Control (ADVANCED) +### Manual Partition Assigment (ADVANCED) -By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions. -This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing -Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again -please be careful when using these or implementing your own. +By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality. ## Use the Maven Shade Plugin to Build the Uber Jar diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 2b5a81afee1..833ce4ab364 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -55,12 +56,6 @@ public class KafkaSpoutConfig implements Serializable { public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); - /** - * Retry in a tight loop (keep unit tests fasts) do not use in production. - */ - public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0), - DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0)); // Kafka consumer configuration private final Map kafkaProps; @@ -128,7 +123,7 @@ public static enum FirstPollOffsetStrategy { public static class Builder { private final Map kafkaProps; - private Subscription subscription; + private final Subscription subscription; private final SerializableDeserializer keyDes; private final Class> keyDesClazz; private final SerializableDeserializer valueDes; @@ -143,15 +138,16 @@ public static class Builder { private boolean emitNullTuples = false; public Builder(String bootstrapServers, String... topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics)); + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } public Builder(String bootstrapServers, Collection topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics)); + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), + new NamedTopicFilter(new HashSet(topics)))); } public Builder(String bootstrapServers, Pattern topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new PatternSubscription(topics)); + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } /** @@ -161,7 +157,7 @@ public Builder(String bootstrapServers, Pattern topics) { */ @Deprecated public Builder(String bootstrapServers, SerializableDeserializer keyDes, SerializableDeserializer valDes, String... topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } /** @@ -171,7 +167,7 @@ public Builder(String bootstrapServers, SerializableDeserializer keyDes, Seri */ @Deprecated public Builder(String bootstrapServers, SerializableDeserializer keyDes, SerializableDeserializer valDes, Collection topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet(topics)))); } /** @@ -181,7 +177,7 @@ public Builder(String bootstrapServers, SerializableDeserializer keyDes, Seri */ @Deprecated public Builder(String bootstrapServers, SerializableDeserializer keyDes, SerializableDeserializer valDes, Pattern topics) { - this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } /** @@ -199,7 +195,7 @@ public Builder(String bootstrapServers, SerializableDeserializer keyDes, Seri */ @Deprecated public Builder(String bootstrapServers, Class> keyDes, Class> valDes, String... topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } /** @@ -207,7 +203,7 @@ public Builder(String bootstrapServers, Class> keyDes, */ @Deprecated public Builder(String bootstrapServers, Class> keyDes, Class> valDes, Collection topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet(topics)))); } /** @@ -215,7 +211,7 @@ public Builder(String bootstrapServers, Class> keyDes, */ @Deprecated public Builder(String bootstrapServers, Class> keyDes, Class> valDes, Pattern topics) { - this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } /** @@ -479,7 +475,6 @@ public Builder setMaxUncommittedOffsets(int maxUncommittedOffsets) { * documentation in {@link FirstPollOffsetStrategy} * * @param firstPollOffsetStrategy Offset used by Kafka spout first poll - * */ public Builder setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { this.firstPollOffsetStrategy = firstPollOffsetStrategy; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java index 61b98a8d144..6bc4bea1e75 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.TopicPartitionComparator; import org.apache.storm.task.TopologyContext; public class ManualPartitionSubscription extends Subscription { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java index 0abd6c81027..f9a6869fd5e 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.kafka.spout; import java.util.List; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java index 34091844f1f..3bb715278c7 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java @@ -30,8 +30,10 @@ import org.slf4j.LoggerFactory; /** - * Subscribe to all topics that follow a given list of values + * Subscribe to all topics that follow a given list of values. + * @deprecated Please use {@link ManualPartitionSubscription} with {@link NamedTopicFilter} instead */ +@Deprecated public class NamedSubscription extends Subscription { private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class); private static final long serialVersionUID = 3438543305215813839L; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java index 9a8de0f6b8f..dc9f9e381e2 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java @@ -26,8 +26,10 @@ import org.slf4j.LoggerFactory; /** - * Subscribe to all topics that match a given pattern + * Subscribe to all topics that match a given pattern. + * @deprecated Please use {@link ManualPartitionSubscription} with {@link PatternTopicFilter} instead */ +@Deprecated public class PatternSubscription extends Subscription { private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class); private static final long serialVersionUID = 3438543305215813839L; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index b7737c7eb94..17ba3782dd4 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -26,16 +26,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -50,58 +49,38 @@ public class KafkaSpoutCommitTest { private final Map conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer consumerMock; - private KafkaSpout spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig spoutConfig; @Captor private ArgumentCaptor> commitCapture; - private void setupSpout(Set assignedPartitions) { + @Before + public void setUp() { MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = new KafkaConsumerFactory() { - @Override - public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { - return consumerMock; - } - }; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testCommitSuccessWithOffsetVoids() { //Verify that the commit logic can handle offset voids try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); // Offsets emitted are 0,1,2,3,4,,8,9 for (int i = 0; i < 5; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } for (int i = 8; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 447f8c4628b..e8e93b08787 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -17,7 +17,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; @@ -31,14 +30,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.junit.Test; @@ -53,6 +49,7 @@ import java.util.HashSet; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.mockito.InOrder; public class KafkaSpoutEmitTest { @@ -63,50 +60,30 @@ public class KafkaSpoutEmitTest { private final Map conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer consumerMock; - private KafkaSpout spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig spoutConfig; - private void setupSpout(Set assignedPartitions) { + @Before + public void setUp() { spoutConfig = getKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); - consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = new KafkaConsumerFactory() { - @Override - public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { - return consumerMock; - } - }; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); for (int i = 0; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); spout.nextTuple(); @@ -119,17 +96,17 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); @@ -184,13 +161,13 @@ public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenR //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> firstPollRecords = new HashMap<>(); List> firstPollRecordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } firstPollRecords.put(partition, firstPollRecordsForPartition); @@ -198,13 +175,13 @@ public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenR Map>> secondPollRecords = new HashMap<>(); List> secondPollRecordsForPartition = new ArrayList<>(); for(int i = 0; i < maxPollRecords; i++) { - secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); + secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); } secondPollRecords.put(partition, secondPollRecordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPollRecords)) - .thenReturn(new ConsumerRecords(secondPollRecords)); + .thenReturn(new ConsumerRecords<>(firstPollRecords)) + .thenReturn(new ConsumerRecords<>(secondPollRecords)); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) { spout.nextTuple(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index bd6e5826a31..899619062c3 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -17,11 +17,11 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -81,14 +81,11 @@ public KafkaConsumer createConsumer(KafkaSpoutConfig emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) { - //Setup spout with mock consumer so we can get at the rebalance listener + private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor rebalanceListenerCapture) { + //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); List assignedPartitions = new ArrayList<>(); @@ -102,9 +99,9 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti Map>> secondPartitionRecords = new HashMap<>(); secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords<>(firstPartitionRecords)) + .thenReturn(new ConsumerRecords<>(secondPartitionRecords)) + .thenReturn(new ConsumerRecords<>(new HashMap>>())); //Emit the messages spout.nextTuple(); @@ -129,7 +126,12 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(), consumerFactoryMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; @@ -137,7 +139,8 @@ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws E TopicPartition assignedPartition = new TopicPartition(topic, 2); //Emit a message on each partition and revoke the first partition - List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Ack both emitted tuples spout.ack(emittedMessageIds.get(0)); @@ -159,8 +162,13 @@ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws E @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) .build(), consumerFactoryMock); @@ -173,7 +181,8 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); //Emit a message on each partition and revoke the first partition - List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Check that only two message ids were generated verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class)); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 831e383cd67..79f73981902 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -30,85 +30,71 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; public class KafkaSpoutRetryLimitTest { - + private final long offsetCommitPeriodMs = 2_000; private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer consumerMock; - private KafkaSpout spout; - private KafkaSpoutConfig spoutConfig; - + private KafkaSpoutConfig spoutConfig; + public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), - 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - - private void setupSpoutWithNoRetry(Set assignedPartitions) { + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + @Captor + private ArgumentCaptor> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .setRetry(ZERO_RETRIES_RETRY_SERVICE) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setRetry(ZERO_RETRIES_RETRY_SERVICE) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = new KafkaConsumerFactory() { - @Override - public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { - return consumerMock; - } - }; - - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } - + @Test public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpoutWithNoRetry(Collections.singleton(partition)); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map>> records = new HashMap<>(); List> recordsForPartition = new ArrayList<>(); int lastOffset = 3; for (int i = 0; i <= lastOffset; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); - + when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); - + .thenReturn(new ConsumerRecords<>(records)); + for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); } - + ArgumentCaptor messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); - + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); } @@ -116,16 +102,15 @@ public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { // Advance time and then trigger call to kafka consumer commit Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); spout.nextTuple(); - - ArgumentCaptor committedOffsets=ArgumentCaptor.forClass(Map.class); + InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(committedOffsets.capture()); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); //verify that Offset 3 was committed for the given TopicPartition - assertTrue(committedOffsets.getValue().containsKey(partition)); - assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset()); + assertTrue(commitCapture.getValue().containsKey(partition)); + assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); } } - -} \ No newline at end of file + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 00b973cb3af..ccb2a6c410e 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; @@ -29,8 +30,10 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,7 +63,7 @@ public class MaxUncommittedOffsetTest { private final int maxUncommittedOffsets = 10; private final int maxPollRecords = 5; private final int initialRetryDelaySecs = 60; - private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .setMaxUncommittedOffsets(maxUncommittedOffsets) @@ -95,6 +98,8 @@ private void populateTopicData(String topicName, int msgCount) throws Exception private void initializeSpout(int msgCount) throws Exception { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java index e97c7e1f2a7..fe3325cf2f5 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -16,10 +16,11 @@ package org.apache.storm.kafka.spout; +import org.apache.storm.kafka.spout.NamedTopicFilter; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; - import static org.mockito.Mockito.when; import java.util.ArrayList; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java index 877efdcc9ad..335ab318032 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java @@ -16,6 +16,8 @@ package org.apache.storm.kafka.spout; +import org.apache.storm.kafka.spout.PatternTopicFilter; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 6042c80cd34..7759b3c49a9 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -36,7 +36,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyObject; @@ -48,7 +48,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,12 +85,12 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) .build(); - this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = new KafkaConsumerFactory() { @Override public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { @@ -112,6 +114,8 @@ void populateTopicData(String topicName, int msgCount) throws InterruptedExcepti private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java new file mode 100644 index 00000000000..67b1f2ccadb --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; + +public class SpoutWithMockedConsumerSetupHelper { + + /** + * Creates, opens and activates a KafkaSpout using a mocked consumer. + * @param The Kafka key type + * @param The Kafka value type + * @param spoutConfig The spout config to use + * @param topoConf The topo conf to pass to the spout + * @param contextMock The topo context to pass to the spout + * @param collectorMock The mocked collector to pass to the spout + * @param consumerMock The mocked consumer + * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it. + * @return The spout + */ + public static KafkaSpout setupSpout(KafkaSpoutConfig spoutConfig, Map topoConf, + TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer consumerMock, Set assignedPartitions) { + + Map> partitionInfos = new HashMap<>(); + for(TopicPartition tp : assignedPartitions) { + PartitionInfo info = new PartitionInfo(tp.topic(), tp.partition(), null, null, null); + List infos = partitionInfos.get(tp.topic()); + if(infos == null) { + infos = new ArrayList<>(); + partitionInfos.put(tp.topic(), infos); + } + infos.add(info); + } + for(String topic : partitionInfos.keySet()) { + when(consumerMock.partitionsFor(topic)) + .thenReturn(partitionInfos.get(topic)); + } + KafkaConsumerFactory consumerFactory = new KafkaConsumerFactory() { + @Override + public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { + return consumerMock; + } + }; + + KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory); + + when(contextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); + when(contextMock.getThisTaskIndex()).thenReturn(0); + + spout.open(topoConf, contextMock, collectorMock); + spout.activate(); + + verify(consumerMock).assign(assignedPartitions); + + return spout; + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java index a5c364c3008..1ab4966d32c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.storm.kafka.spout.builders; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; import java.util.List; @@ -27,7 +28,9 @@ import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.Subscription; import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; @@ -38,6 +41,13 @@ public class SingleTopicKafkaSpoutConfiguration { public static final String STREAM = "test_stream"; public static final String TOPIC = "test"; + /** + * Retry in a tight loop (keep unit tests fasts). + */ + public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + public static Config getConfig() { Config config = new Config(); config.setDebug(true); @@ -57,21 +67,29 @@ public List apply(ConsumerRecord r) { return new Values(r.topic(), r.key(), r.value()); } }; - - public static KafkaSpoutConfig.Builder getKafkaSpoutConfigBuilder(int port) { - return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC) - .setRecordTranslator(TOPIC_KEY_VALUE_FUNC, - new Fields("topic", "key", "value"), STREAM) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) - .setRetry(getRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .setPollTimeoutMs(1000); + + public static KafkaSpoutConfig.Builder getKafkaSpoutConfigBuilder(int port) { + return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); + } + + public static KafkaSpoutConfig.Builder getKafkaSpoutConfigBuilder(Subscription subscription, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder("127.0.0.1:" + port, subscription)); } - + + private static KafkaSpoutConfig.Builder setCommonSpoutConfig(KafkaSpoutConfig.Builder config) { + return config + .setRecordTranslator(TOPIC_KEY_VALUE_FUNC, + new Fields("topic", "key", "value"), STREAM) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) + .setRetry(getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .setPollTimeoutMs(1000); + } + protected static KafkaSpoutRetryService getRetryService() { - return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE; + return UNIT_TEST_RETRY_SERVICE; } }