From ca420a4a960c9667262833add74e231fe8d5518b Mon Sep 17 00:00:00 2001 From: Hugo Louro Date: Sat, 27 May 2017 10:02:21 -0700 Subject: [PATCH] STORM-2554: Trident Kafka Spout Refactoring to Include Manual Partition Assignment - Support manual partition assignment with changes in STORM-2541 - Improve rebalance pause/resume logic --- .../kafka/trident/KafkaProducerTopology.java | 2 +- .../storm/kafka/spout/KafkaSpoutConfig.java | 6 +- .../ManualPartitionPatternSubscription.java | 76 -------- ....java => ManualPartitionSubscription.java} | 33 ++-- .../storm/kafka/spout/ManualPartitioner.java | 3 +- .../storm/kafka/spout/NamedTopicFilter.java | 67 +++++++ .../storm/kafka/spout/PatternTopicFilter.java | 68 +++++++ .../apache/storm/kafka/spout/TopicFilter.java | 39 ++++ .../trident/KafkaTridentSpoutEmitter.java | 182 ++++++++++++------ .../trident/KafkaTridentSpoutManager.java | 36 +--- .../kafka/spout/NamedTopicFilterTest.java | 69 +++++++ .../kafka/spout/PatternTopicFilterTest.java | 74 +++++++ ...OpaquePartitionedTridentSpoutExecutor.java | 18 +- 13 files changed, 481 insertions(+), 192 deletions(-) rename external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/{ManualPartitionNamedSubscription.java => ManualPartitionSubscription.java} (69%) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java index 56bfb8348ff..67253adcdbe 100644 --- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java +++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/KafkaProducerTopology.java @@ -54,7 +54,7 @@ public static StormTopology newTopology(String brokerUrl, String topicName) { /** * @return the Storm config for the topology that publishes sentences to kafka using a kafka bolt. */ - private static Properties newProps(final String brokerUrl, final String topicName) { + static Properties newProps(final String brokerUrl, final String topicName) { return new Properties() {{ put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 43a6e0b83ec..3a03f16d02a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -78,7 +78,11 @@ public static enum FirstPollOffsetStrategy { LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } - + + public static Builder builder(String bootstrapServers, Subscription subscription) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, subscription); + } + public static Builder builder(String bootstrapServers, String ... topics) { return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java index 23444776d6a..e69de29bb2d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -public class ManualPartitionPatternSubscription extends PatternSubscription { - private static final long serialVersionUID = 5633018073527583826L; - private final ManualPartitioner parter; - private Set currentAssignment = null; - private KafkaConsumer consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; - - public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) { - super(pattern); - this.parter = parter; - } - - @Override - public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { - this.consumer = consumer; - this.listener = listener; - this.context = context; - refreshAssignment(); - } - - @Override - public void refreshAssignment() { - List allPartitions = new ArrayList<>(); - for (Map.Entry> entry: consumer.listTopics().entrySet()) { - if (pattern.matcher(entry.getKey()).matches()) { - for (PartitionInfo partitionInfo: entry.getValue()) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } - } - Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set newAssignment = new HashSet<>(parter.partition(allPartitions, context)); - if (!newAssignment.equals(currentAssignment)) { - if (currentAssignment != null) { - listener.onPartitionsRevoked(currentAssignment); - listener.onPartitionsAssigned(newAssignment); - } - currentAssignment = newAssignment; - consumer.assign(currentAssignment); - } - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java similarity index 69% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java index 926fdf02ec2..53e8257a732 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -18,36 +18,29 @@ package org.apache.storm.kafka.spout; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; -public class ManualPartitionNamedSubscription extends NamedSubscription { +public class ManualPartitionSubscription extends Subscription { private static final long serialVersionUID = 5633018073527583826L; private final ManualPartitioner partitioner; + private final TopicFilter partitionFilter; private Set currentAssignment = null; private KafkaConsumer consumer = null; private ConsumerRebalanceListener listener = null; private TopologyContext context = null; - public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection topics) { - super(topics); + public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { + this.partitionFilter = partitionFilter; this.partitioner = parter; } - public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) { - this(parter, Arrays.asList(topics)); - } - @Override public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { this.consumer = consumer; @@ -58,21 +51,21 @@ public void subscribe(KafkaConsumer consumer, ConsumerRebalanceList @Override public void refreshAssignment() { - List allPartitions = new ArrayList<>(); - for (String topic : topics) { - for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } + final List allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); + final Set newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); if (!newAssignment.equals(currentAssignment)) { + consumer.assign(newAssignment); if (currentAssignment != null) { listener.onPartitionsRevoked(currentAssignment); - listener.onPartitionsAssigned(newAssignment); } currentAssignment = newAssignment; - consumer.assign(currentAssignment); + listener.onPartitionsAssigned(newAssignment); } } + + @Override + public String getTopicsString() { + return partitionFilter.getTopicsString(); + } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java index 4856687ee26..bd24f582e01 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java @@ -18,6 +18,7 @@ package org.apache.storm.kafka.spout; +import java.io.Serializable; import java.util.List; import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; @@ -29,7 +30,7 @@ * number of spouts to avoid missing partitions or double assigning partitions. */ @FunctionalInterface -public interface ManualPartitioner { +public interface ManualPartitioner extends Serializable { /** * Get the partitions for this assignment * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java new file mode 100644 index 00000000000..43ff7b2f950 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for the specified topics. + */ +public class NamedTopicFilter implements TopicFilter { + + private final Set topics; + + /** + * Create filter based on a set of topic names. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(Set topics) { + this.topics = Collections.unmodifiableSet(topics); + } + + /** + * Convenience constructor. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(String... topics) { + this(new HashSet<>(Arrays.asList(topics))); + } + + @Override + public List getFilteredTopicPartitions(KafkaConsumer consumer) { + final List allPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java new file mode 100644 index 00000000000..a84023cdf45 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for topics matching the given {@link Pattern}. + */ +public class PatternTopicFilter implements TopicFilter { + + private final Pattern pattern; + private Set topics; + + /** + * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. + * @param pattern The Pattern to use. + */ + public PatternTopicFilter(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public List getFilteredTopicPartitions(KafkaConsumer consumer) { + topics = new HashSet<>(); + List allPartitions = new ArrayList<>(); + for (Map.Entry> entry: consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo: entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + topics.add(partitionInfo.topic()); + } + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } + + public String getTopicsPattern() { + return pattern.pattern(); + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java new file mode 100644 index 00000000000..65020c9d80b --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public interface TopicFilter extends Serializable { + + /** + * Get the Kafka TopicPartitions passed by this filter. + * @param consumer The Kafka consumer to use to read the list of existing partitions + * @return The Kafka partitions passed by this filter. + */ + List getFilteredTopicPartitions(KafkaConsumer consumer); + + /** + * Gets a human readable representation of the topics + * @return a string representing the subscribed topics. + */ + String getTopicsString(); + +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 5351f7922ae..ed3935e5ae3 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -62,25 +64,31 @@ public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTrident private final KafkaTridentSpoutManager kafkaManager; private Set firstPoll = new HashSet<>(); // set of topic-partitions for which first poll has already occurred - // Declare some KafkaTridentSpoutManager references for convenience + // Declare some KafkaTridentSpoutManager and Storm internal references for convenience private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; private final RecordTranslator translator; private final Timer refreshSubscriptionTimer; + private final KafkaSpoutConfig kafkaSpoutConfig; + private final TopologyContext topologyContext; - private TopologyContext topologyContext; + private boolean transactionInProgress; + private Collection pausedTopicPartitions; + private boolean replayTransaction; + private KafkaSpoutConsumerRebalanceListener kafkaConsListener; - public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager kafkaManager, TopologyContext topologyContext, - Timer refreshSubscriptionTimer) { - this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext); + public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager kafkaManager, + TopologyContext topologyContext, Timer refreshSubscriptionTimer) { this.kafkaManager = kafkaManager; this.topologyContext = topologyContext; this.refreshSubscriptionTimer = refreshSubscriptionTimer; - this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator(); - - final KafkaSpoutConfig kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig(); - this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs(); - this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig(); + kafkaConsListener = new KafkaSpoutConsumerRebalanceListener(); + kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext, kafkaConsListener); + translator = kafkaSpoutConfig.getTranslator(); + pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs(); + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + pausedTopicPartitions = Collections.emptySet(); LOG.debug("Created {}", this); } @@ -92,54 +100,125 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager kafkaManager, Top kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); } + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + TopicPartition currBatchTp; // Topic Partition being processed in current batch + + @Override + public void onPartitionsRevoked(Collection partitions) { + log("Partitions revoked", partitions); + KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions); + + if (transactionInProgress) { + resumeTopicPartitions(); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log("Partitions reassignment", partitions); + KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions); + + if (transactionInProgress) { + if (!partitions.contains(currBatchTp)) { + replayTransaction = true; + LOG.warn("Partitions reassignment. Current batch's topic-partition [{}] " + + "no longer assigned to consumer={} of consumer-group={}. Replaying transaction.", + currBatchTp, kafkaConsumer); + } else { + pauseTopicPartitions(partitions, currBatchTp); // pause topic-partitions other than current batch's tp + } + } + } + + private void log(String msg, Collection partitions) { // tip - transactionInProgresses + LOG.info("{}. [transaction-in-progress={}, currBatchTp={}, paused-topic-partitions={}, " + + "topic-partitions={}, consumer-group={}, consumer={}]", + msg, transactionInProgress, currBatchTp, pausedTopicPartitions, partitions, + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer); + } + } + @Override public KafkaTridentSpoutBatchMetadata emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata lastBatch) { - LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", - tx, currBatchPartition, lastBatch, collector); - - final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); - final Set assignments = kafkaConsumer.assignment(); KafkaTridentSpoutBatchMetadata currentBatch = lastBatch; - Collection pausedTopicPartitions = Collections.emptySet(); + try { + LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", + tx, currBatchPartition, lastBatch, collector); - if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) { - LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " - + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " - + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments, - kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()); - } else { - try { - // pause other topic-partitions to only poll from current topic-partition - pausedTopicPartitions = pauseTopicPartitions(currBatchTp); + transactionInProgress = true; + final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); + kafkaConsListener.currBatchTp = currBatchTp; + + final Set assignments = kafkaConsumer.assignment(); + + if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) { + LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " + + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments, + kafkaConsumer, kafkaSpoutConfig.getConsumerGroupId()); + } else { + // pause other topic-partitions to poll only from current topic-partition + pauseTopicPartitions(assignments, currBatchTp); seek(currBatchTp, lastBatch); - // poll if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { - kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment(); + kafkaSpoutConfig.getSubscription().refreshAssignment(); } + // Consumer rebalance listener is called during this poll final ConsumerRecords records = kafkaConsumer.poll(pollTimeoutMs); - LOG.debug("Polled [{}] records from Kafka.", records.count()); - if (!records.isEmpty()) { - emitTuples(collector, records); - // build new metadata - currentBatch = new KafkaTridentSpoutBatchMetadata<>(currBatchTp, records, lastBatch); + if (replayTransaction) { + replayTransaction(); + } else { + LOG.debug("Polled [{}] records from Kafka.", records.count()); + if (!records.isEmpty()) { + emitTuples(collector, records); + // build new metadata + currentBatch = new KafkaTridentSpoutBatchMetadata<>(currBatchTp, records, lastBatch); + } } - } finally { - kafkaConsumer.resume(pausedTopicPartitions); - LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); } - LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " - + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); + } finally { + resumeTopicPartitions(); + transactionInProgress = false; + replayTransaction = false; + kafkaConsListener.currBatchTp = null; } - + LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); return currentBatch; } + // Logs msg. The metadata state is not updated, hence left as the previous transaction's metadata, which is equivalent to a replay + private void replayTransaction() { + LOG.debug("Replaying transaction due to Kafka consumer rebalance. [consumer-group={}, consumer={}", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer); + } + + private void resumeTopicPartitions() { + final Collection resumedTps = pausedTopicPartitions; + kafkaConsumer.resume(pausedTopicPartitions); + pausedTopicPartitions = Collections.emptySet(); + LOG.trace("Resumed topic-partitions {}", resumedTps); + } + + private void pauseTopicPartitions(Collection assigned, TopicPartition tpNotToPause) { + final Set topicPartitionsToPause = new HashSet<>(assigned); + LOG.debug("Currently assigned topic-partitions {}", topicPartitionsToPause); + if (tpNotToPause != null) { + topicPartitionsToPause.remove(tpNotToPause); + } else { + LOG.warn("Attempted to pause null topic-partition"); + } + kafkaConsumer.pause(topicPartitionsToPause); + LOG.debug("Paused topic-partitions {}", topicPartitionsToPause); + pausedTopicPartitions = topicPartitionsToPause; + } + private void emitTuples(TridentCollector collector, ConsumerRecords records) { for (ConsumerRecord record : records) { final List tuple = translator.apply(record); @@ -151,9 +230,10 @@ private void emitTuples(TridentCollector collector, ConsumerRecords record /** * Determines the offset of the next fetch. For failed batches lastBatchMeta is not null and contains the fetch * offset of the failed batch. In this scenario the next fetch will take place at offset of the failed batch + 1. - * When the previous batch is successful, lastBatchMeta is null, and the offset of the next fetch is either the - * offset of the last commit to kafka, or if no commit was yet made, the offset dictated by - * {@link KafkaSpoutConfig.FirstPollOffsetStrategy} + * When the previous batch is successful, lastBatchMeta is null, and the offset of the next fetch is, for the first poll, + * the offset of the last commit to kafka, or if no commit was yet made, the offset dictated by + * {@link KafkaSpoutConfig.FirstPollOffsetStrategy}. For the polls after the first, it is the fetch offset of where the + * kafka consumer instance left off, as dictated by Kafka when no seek happens. * * @return the offset of the next fetch */ @@ -191,20 +271,10 @@ private boolean isFirstPoll(TopicPartition tp) { return !firstPoll.contains(tp); } - // returns paused topic-partitions. - private Collection pauseTopicPartitions(TopicPartition excludedTp) { - final Set pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment()); - LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions); - pausedTopicPartitions.remove(excludedTp); - kafkaConsumer.pause(pausedTopicPartitions); - LOG.debug("Paused topic-partitions {}", pausedTopicPartitions); - return pausedTopicPartitions; - } - @Override public void refreshPartitions(List partitionResponsibilities) { LOG.trace("Refreshing of topic-partitions handled by Kafka. " - + "No action taken by this method for topic partitions {}", partitionResponsibilities); + + "No action taken by this method for topic-partitions {}", partitionResponsibilities); } /** @@ -255,7 +325,11 @@ public void close() { @Override public String toString() { return super.toString() - + "{kafkaManager=" + kafkaManager - + '}'; + + "{kafkaManager=" + kafkaManager + + ", transactionInProgress=" + transactionInProgress + + ", pausedTopicPartitions=" + pausedTopicPartitions + + ", replayTransaction=" + replayTransaction + + ", currBatchTp=" + kafkaConsListener.currBatchTp + + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java index cf790b27cfd..5c5cbb5e7c6 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java @@ -19,7 +19,6 @@ package org.apache.storm.kafka.spout.trident; import java.io.Serializable; -import java.util.Collection; import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,9 +33,6 @@ public class KafkaTridentSpoutManager implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class); - // Kafka - private transient KafkaConsumer kafkaConsumer; - // Bookkeeping private final KafkaSpoutConfig kafkaSpoutConfig; // Declare some KafkaSpoutConfig references for convenience @@ -48,15 +44,11 @@ public KafkaTridentSpoutManager(KafkaSpoutConfig kafkaSpoutConfig) { LOG.debug("Created {}", this); } - KafkaConsumer createAndSubscribeKafkaConsumer(TopologyContext context) { - kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + KafkaConsumer createAndSubscribeKafkaConsumer(TopologyContext context, ConsumerRebalanceListener listener) { + final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); - kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context); - return kafkaConsumer; - } - - KafkaConsumer getKafkaConsumer() { + kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, listener, context); return kafkaConsumer; } @@ -79,7 +71,6 @@ Fields getFields() { } fields = fs; } - LOG.debug("OutputFields = {}", fields); return fields; } @@ -89,25 +80,6 @@ KafkaSpoutConfig getKafkaSpoutConfig() { @Override public String toString() { - return super.toString() - + "{kafkaConsumer=" + kafkaConsumer - + ", kafkaSpoutConfig=" + kafkaSpoutConfig - + '}'; - } - - private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { - @Override - public void onPartitionsRevoked(Collection partitions) { - LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions); - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions); - LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - } + return super.toString() + "{kafkaSpoutConfig=" + kafkaSpoutConfig + '}'; } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java new file mode 100644 index 00000000000..06f3a5d4b85 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class NamedTopicFilterTest { + + private KafkaConsumer consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo); + + when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List partitionTwoPartitions = new ArrayList<>(); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); + when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + List matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java new file mode 100644 index 00000000000..5fa65117d72 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PatternTopicFilterTest { + + private KafkaConsumer consumerMock; + + @Before + public void setUp(){ + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + Pattern pattern = Pattern.compile("test-\\d+"); + PatternTopicFilter filter = new PatternTopicFilter(pattern); + + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + Map> allTopics = new HashMap<>(); + allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List testTwoPartitions = new ArrayList<>(); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + allTopics.put(matchingTopicTwo, testTwoPartitions); + allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + when(consumerMock.listTopics()).thenReturn(allTopics); + + List matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } +} diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java index e7bf70a5ff1..a778707d8e4 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java @@ -37,11 +37,11 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout { - protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class); - IOpaquePartitionedTridentSpout _spout; public class Coordinator implements ITridentSpout.BatchCoordinator { + protected final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + IOpaquePartitionedTridentSpout.Coordinator _coordinator; public Coordinator(Map conf, TopologyContext context) { @@ -50,8 +50,10 @@ public Coordinator(Map conf, TopologyContext context) { @Override public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) { - LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata); - return _coordinator.getPartitionsForBatch(); + final Object partitionsForBatch = _coordinator.getPartitionsForBatch(); + LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}], [partitionsForBatch = {}]", + txid, prevMetadata, currMetadata, partitionsForBatch); + return partitionsForBatch; } @@ -85,11 +87,13 @@ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) { } } - public class Emitter implements ICommitterTridentSpout.Emitter { + public class Emitter implements ICommitterTridentSpout.Emitter { + protected final Logger LOG = LoggerFactory.getLogger(Emitter.class); + IOpaquePartitionedTridentSpout.Emitter _emitter; TransactionalState _state; TreeMap> _cachedMetas = new TreeMap<>(); - Map _partitionStates = new HashMap<>(); + Map _partitionStates = new HashMap<>(); // key is partition id int _index; int _numTasks; @@ -116,7 +120,7 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition)); } - // refresh all partitions for backwards compatibility with old spout + // refresh all partitions for backwards compatibility with old trident spout _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta)); _savedCoordinatorMeta = coordinatorMeta; _changedMeta = true;