From ba5260774bbd436e81da14a97c072e399a70a896 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 9 Mar 2018 20:57:53 +0100 Subject: [PATCH] STORM-2974: Add transactional spout to storm-kafka-client --- bin/storm.py | 4 +- docs/Command-line-client.md | 2 +- docs/Transactional-topologies.md | 6 +- docs/Trident-state.md | 4 +- docs/flux.md | 2 +- ...TridentKafkaClientTopologyNamedTopics.java | 16 +- .../kafka/spout/EmptyKafkaTupleListener.java | 1 - .../apache/storm/kafka/spout/KafkaSpout.java | 90 +++---- .../storm/kafka/spout/KafkaSpoutConfig.java | 1 - .../storm/kafka/spout/KafkaTupleListener.java | 1 - ...sumerFactory.java => ConsumerFactory.java} | 6 +- ...fault.java => ConsumerFactoryDefault.java} | 2 +- .../kafka/spout/internal/OffsetManager.java | 2 - .../spout/metrics/KafkaOffsetMetric.java | 15 +- .../spout/subscription/NamedTopicFilter.java | 4 +- .../subscription/PatternTopicFilter.java | 7 +- .../spout/subscription/TopicAssigner.java | 5 +- .../kafka/spout/subscription/TopicFilter.java | 4 +- .../KafkaTridentOpaqueSpoutEmitter.java | 68 ++++++ .../KafkaTridentSpoutBatchMetadata.java | 23 +- ...java => KafkaTridentSpoutCoordinator.java} | 21 +- .../trident/KafkaTridentSpoutEmitter.java | 219 ++++++++++++------ .../trident/KafkaTridentSpoutOpaque.java | 18 +- .../KafkaTridentSpoutTransactional.java | 65 ++++++ ...KafkaTridentTransactionalSpoutEmitter.java | 68 ++++++ .../internal/OutputFieldsExtractor.java | 41 ++++ .../kafka/spout/KafkaSpoutAbstractTest.java | 11 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 10 +- .../KafkaSpoutLogCompactionSupportTest.java | 39 ++-- .../spout/KafkaSpoutReactivationTest.java | 12 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 32 +-- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 19 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 7 +- .../SingleTopicKafkaUnitSetupHelper.java | 4 +- .../SpoutWithMockedConsumerSetupHelper.java | 4 +- .../KafkaTridentSpoutBatchMetadataTest.java | 10 +- .../trident/KafkaTridentSpoutEmitterTest.java | 147 ++++++++---- ...afkaTridentSpoutOpaqueCoordinatorTest.java | 9 +- 38 files changed, 695 insertions(+), 304 deletions(-) rename external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/{KafkaConsumerFactory.java => ConsumerFactory.java} (79%) rename external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/{KafkaConsumerFactoryDefault.java => ConsumerFactoryDefault.java} (91%) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java rename external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/{KafkaTridentSpoutOpaqueCoordinator.java => KafkaTridentSpoutCoordinator.java} (80%) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentTransactionalSpoutEmitter.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java diff --git a/bin/storm.py b/bin/storm.py index 23bab7305e6..3767b4585e6 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -361,7 +361,7 @@ def jar(jarfile, klass, *args): And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string. You can also exclude some dependencies like what you're doing in maven pom. Please add exclusion artifacts with '^' separated string after the artifact. - For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka. + For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka. When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string. Repository format is "^". '^' is taken as separator because URL allows various characters. @@ -373,7 +373,7 @@ def jar(jarfile, klass, *args): --proxyUsername: username of proxy if it requires basic auth --proxyPassword: password of proxy if it requires basic auth - Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"` + Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"` When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology. diff --git a/docs/Command-line-client.md b/docs/Command-line-client.md index 7bc678ddf8a..2348e3f290d 100644 --- a/docs/Command-line-client.md +++ b/docs/Command-line-client.md @@ -50,7 +50,7 @@ And when you want to ship maven artifacts and its transitive dependencies, you c When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string. Repository format is "^". '^' is taken as separator because URL allows various characters. For example, --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" will add JBoss and HDP repositories for dependency resolver. -Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"` +Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"` When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology. diff --git a/docs/Transactional-topologies.md b/docs/Transactional-topologies.md index db5509f6c08..612ac32f1a0 100644 --- a/docs/Transactional-topologies.md +++ b/docs/Transactional-topologies.md @@ -77,7 +77,7 @@ When using transactional topologies, Storm does the following for you: 3. *Fault detection:* Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed. Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you. 4. *First class batch processing API*: Storm layers an API on top of regular bolts to allow for batch processing of tuples. Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction. Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts). -Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka](https://github.com/apache/storm/tree/master/external/storm-kafka) contains a transactional spout implementation for Kafka. +Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka-client](https://github.com/apache/storm/tree/master/external/storm-kafka-client) contains a transactional spout implementation for Kafka. ## The basics through example @@ -255,7 +255,7 @@ The details of implementing a `TransactionalSpout` are in [the Javadoc](javadocs #### Partitioned Transactional Spout -A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [TransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/TransactionalKafkaSpout.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details. +A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [KafkaTridentSpoutTransactional]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details. ### Configuration @@ -325,7 +325,7 @@ In this scenario, tuples 41-50 are skipped. By failing all subsequent transactio By failing all subsequent transactions on failure, no tuples are skipped. This also shows that a requirement of transactional spouts is that they always emit where the last transaction left off. -A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [OpaqueTransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/OpaqueTransactionalKafkaSpout.java) is an example. `OpaqueTransactionalKafkaSpout` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section. +A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [KafkaTridentSpoutOpaque]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Trident/KafkaTridentSpoutOpaque.java) is an example. `KafkaTridentSpoutOpaque` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section. ## Implementation diff --git a/docs/Trident-state.md b/docs/Trident-state.md index a89dc3ca70f..ead8d86fe64 100644 --- a/docs/Trident-state.md +++ b/docs/Trident-state.md @@ -28,7 +28,7 @@ Remember, Trident processes tuples as small batches with each batch being given 2. There's no overlap between batches of tuples (tuples are in one batch or another, never multiple). 3. Every tuple is in a batch (no tuples are skipped) -This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. storm-contrib has [an implementation of a transactional spout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java) for Kafka. +This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. Storm has [an implementation of a transactional spout]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional) for Kafka. You might be wondering – why wouldn't you just always use a transactional spout? They're simple and easy to understand. One reason you might not use one is because they're not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You're now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt. @@ -72,7 +72,7 @@ As described before, an opaque transactional spout cannot guarantee that the bat 1. Every tuple is *successfully* processed in exactly one batch. However, it's possible for a tuple to fail to process in one batch and then succeed to process in a later batch. -[OpaqueTridentKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches. +[KafkaTridentSpoutOpaque]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for KafkaTridentSpoutOpaque to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches. With opaque transactional spouts, it's no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates. diff --git a/docs/flux.md b/docs/flux.md index b1b359a0cd1..69cbb20a69f 100644 --- a/docs/flux.md +++ b/docs/flux.md @@ -31,7 +31,7 @@ the layout and configuration of your topologies. in your topology code * Support for existing topology code (see below) * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL - * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.) + * YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.) * Convenient support for multi-lang components * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style `${variable.name}` substitution) diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java index 4f3dd3812b1..6fecb5cc264 100644 --- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java @@ -37,6 +37,8 @@ import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; +import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTransactional; +import org.apache.storm.trident.spout.ITridentDataSource; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; @@ -53,6 +55,11 @@ public class TridentKafkaClientTopologyNamedTopics { private KafkaTridentSpoutOpaque newKafkaTridentSpoutOpaque(KafkaSpoutConfig spoutConfig) { return new KafkaTridentSpoutOpaque<>(spoutConfig); } + + private KafkaTridentSpoutTransactional newKafkaTridentSpoutTransactional( + KafkaSpoutConfig spoutConfig) { + return new KafkaTridentSpoutTransactional<>(spoutConfig); + } private static final Func, List> JUST_VALUE_FUNC = new JustValueFunc(); @@ -66,7 +73,7 @@ public List apply(ConsumerRecord record) { return new Values(record.value()); } } - + protected KafkaSpoutConfig newKafkaSpoutConfig(String bootstrapServers) { return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime()) @@ -91,7 +98,8 @@ public static void main(String[] args) throws Exception { protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER; - System.out.println("Running with broker url " + brokerUrl); + final boolean isOpaque = args.length > 1 ? Boolean.parseBoolean(args[1]) : true; + System.out.println("Running with broker url " + brokerUrl + " and isOpaque=" + isOpaque); Config tpConf = new Config(); tpConf.setDebug(true); @@ -101,7 +109,9 @@ protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyE StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1)); StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2)); // Consumer + KafkaSpoutConfig spoutConfig = newKafkaSpoutConfig(brokerUrl); + ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig); StormSubmitter.submitTopology("topics-consumer", tpConf, - TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque(newKafkaSpoutConfig(brokerUrl)))); + TridentKafkaConsumerTopology.newTopology(spout)); } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java index 621fecda1da..a9a5cc54897 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; - import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index f5969afe660..9aeba6b4e76 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -36,8 +36,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import org.apache.commons.lang.Validate; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -49,8 +49,8 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; import org.apache.storm.kafka.spout.internal.CommitMetadataManager; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric; @@ -75,9 +75,9 @@ public class KafkaSpout extends BaseRichSpout { // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; - private final KafkaConsumerFactory kafkaConsumerFactory; + private final ConsumerFactory kafkaConsumerFactory; private final TopicAssigner topicAssigner; - private transient KafkaConsumer kafkaConsumer; + private transient Consumer consumer; // Bookkeeping // Strategy to determine the fetch offset of the first realized by the spout upon activation @@ -106,11 +106,11 @@ public class KafkaSpout extends BaseRichSpout { private transient KafkaSpoutConsumerRebalanceListener rebalanceListener; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { - this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>(), new TopicAssigner()); + this(kafkaSpoutConfig, new ConsumerFactoryDefault<>(), new TopicAssigner()); } @VisibleForTesting - KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory, TopicAssigner topicAssigner) { + KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, ConsumerFactory kafkaConsumerFactory, TopicAssigner topicAssigner) { this.kafkaConsumerFactory = kafkaConsumerFactory; this.topicAssigner = topicAssigner; this.kafkaSpoutConfig = kafkaSpoutConfig; @@ -144,7 +144,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputC rebalanceListener = new KafkaSpoutConsumerRebalanceListener(); - kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); + consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); tupleListener.open(conf, context); if (canRegisterMetrics()) { @@ -156,7 +156,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputC private void registerMetric() { LOG.info("Registering Spout Metrics"); - kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer); + kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer); context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs()); } @@ -184,7 +184,7 @@ public void onPartitionsRevoked(Collection partitions) { previousAssignment = partitions; LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); @@ -194,7 +194,7 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", - context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); initialize(partitions); tupleListener.onPartitionsReassigned(partitions); @@ -217,7 +217,7 @@ private void initialize(Collection partitions) { Set newPartitions = new HashSet<>(partitions); newPartitions.removeAll(previousAssignment); for (TopicPartition newTp : newPartitions) { - final OffsetAndMetadata committedOffset = kafkaConsumer.committed(newTp); + final OffsetAndMetadata committedOffset = consumer.committed(newTp); final long fetchOffset = doSeek(newTp, committedOffset); LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]", fetchOffset, newTp, firstPollOffsetStrategy, committedOffset); @@ -242,29 +242,29 @@ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { committedOffset, Collections.unmodifiableMap(offsetManagers))) { // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply. - kafkaConsumer.seek(newTp, committedOffset.offset()); + consumer.seek(newTp, committedOffset.offset()); } else { // offset was not committed by this topology, therefore FirstPollOffsetStrategy applies // (only when the topology is first deployed). if (firstPollOffsetStrategy.equals(EARLIEST)) { - kafkaConsumer.seekToBeginning(Collections.singleton(newTp)); + consumer.seekToBeginning(Collections.singleton(newTp)); } else if (firstPollOffsetStrategy.equals(LATEST)) { - kafkaConsumer.seekToEnd(Collections.singleton(newTp)); + consumer.seekToEnd(Collections.singleton(newTp)); } else { // Resume polling at the last committed offset, i.e. the first offset that is not marked as processed. - kafkaConsumer.seek(newTp, committedOffset.offset()); + consumer.seek(newTp, committedOffset.offset()); } } } else { // no offset commits have ever been done for this consumer group and topic-partition, // so start at the beginning or end depending on FirstPollOffsetStrategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { - kafkaConsumer.seekToBeginning(Collections.singleton(newTp)); + consumer.seekToBeginning(Collections.singleton(newTp)); } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { - kafkaConsumer.seekToEnd(Collections.singleton(newTp)); + consumer.seekToEnd(Collections.singleton(newTp)); } } - return kafkaConsumer.position(newTp); + return consumer.position(newTp); } } @@ -280,9 +280,9 @@ public void nextTuple() { if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { - Map offsetsToCommit = - createFetchedOffsetsMetadata(kafkaConsumer.assignment()); - kafkaConsumer.commitAsync(offsetsToCommit, null); + Map offsetsToCommit = + createFetchedOffsetsMetadata(consumer.assignment()); + consumer.commitAsync(offsetsToCommit, null); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } } @@ -314,7 +314,7 @@ private PollablePartitionsInfo getPollablePartitionsInfo() { return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap()); } - Set assignment = kafkaConsumer.assignment(); + Set assignment = consumer.assignment(); if (!isAtLeastOnceProcessing()) { return new PollablePartitionsInfo(assignment, Collections.emptyMap()); } @@ -357,32 +357,32 @@ private void setWaitingToEmit(ConsumerRecords consumerRecords) { // ======== poll ========= private ConsumerRecords pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) { doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets); - Set pausedPartitions = new HashSet<>(kafkaConsumer.assignment()); + Set pausedPartitions = new HashSet<>(consumer.assignment()); pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains); try { - kafkaConsumer.pause(pausedPartitions); - final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + consumer.pause(pausedPartitions); + final ConsumerRecords consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka", numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. - Map offsetsToCommit = - createFetchedOffsetsMetadata(kafkaConsumer.assignment()); - kafkaConsumer.commitSync(offsetsToCommit); + Map offsetsToCommit = + createFetchedOffsetsMetadata(consumer.assignment()); + consumer.commitSync(offsetsToCommit); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } return consumerRecords; } finally { - kafkaConsumer.resume(pausedPartitions); + consumer.resume(pausedPartitions); } } private void doSeekRetriableTopicPartitions(Map pollableEarliestRetriableOffsets) { for (Entry retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) { //Seek directly to the earliest retriable message for each retriable topic partition - kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + consumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); } } @@ -471,8 +471,10 @@ private boolean emitOrRetryTuple(ConsumerRecord record) { return true; } } else { - /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately - * to allow its offset to be commited to Kafka*/ + /* + * if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately to allow its offset + * to be commited to Kafka + */ LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record); if (isAtLeastOnceProcessing()) { msgId.setNullTuple(true); @@ -494,11 +496,11 @@ private boolean isEmitTuple(List tuple) { private Map createFetchedOffsetsMetadata(Set assignedPartitions) { Map offsetsToCommit = new HashMap<>(); for (TopicPartition tp : assignedPartitions) { - offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata())); + offsetsToCommit.put(tp, new OffsetAndMetadata(consumer.position(tp), commitMetadataManager.getCommitMetadata())); } return offsetsToCommit; } - + private void commitOffsetsForAckedTuples() { final Map nextCommitOffsets = new HashMap<>(); for (Map.Entry tpOffset : offsetManagers.entrySet()) { @@ -510,14 +512,14 @@ private void commitOffsetsForAckedTuples() { // Commit offsets that are ready to be committed for every topic partition if (!nextCommitOffsets.isEmpty()) { - kafkaConsumer.commitSync(nextCommitOffsets); + consumer.commitSync(nextCommitOffsets); LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop for (Map.Entry tpOffset : nextCommitOffsets.entrySet()) { //Update the OffsetManager for each committed partition, and update numUncommittedOffsets final TopicPartition tp = tpOffset.getKey(); - long position = kafkaConsumer.position(tp); + long position = consumer.position(tp); long committedOffset = tpOffset.getValue().offset(); if (position < committedOffset) { /* @@ -528,7 +530,7 @@ private void commitOffsetsForAckedTuples() { */ LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", position, committedOffset); - kafkaConsumer.seek(tp, committedOffset); + consumer.seek(tp, committedOffset); List> waitingToEmitForTp = waitingToEmit.get(tp); if (waitingToEmitForTp != null) { //Discard the pending records that are already committed @@ -568,11 +570,11 @@ public void ack(Object messageId) { if (!emitted.contains(msgId)) { LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " - + "came from a topic-partition that this consumer group instance is no longer tracking " - + "due to rebalance/partition reassignment. No action taken.", msgId); + + "came from a topic-partition that this consumer group instance is no longer tracking " + + "due to rebalance/partition reassignment. No action taken.", msgId); } else { Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." - + " This should never occur barring errors in the RetryService implementation or the spout code."); + + " This should never occur barring errors in the RetryService implementation or the spout code."); offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } @@ -621,12 +623,12 @@ public void activate() { } private void refreshAssignment() { - Set allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(kafkaConsumer); + Set allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(consumer); List allPartitionsSorted = new ArrayList<>(allPartitions); Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE); Set assignedPartitions = kafkaSpoutConfig.getTopicPartitioner() .getPartitionsForThisTask(allPartitionsSorted, context); - topicAssigner.assignPartitions(kafkaConsumer, assignedPartitions, rebalanceListener); + topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener); } @Override @@ -658,7 +660,7 @@ private void shutdown() { commitIfNecessary(); } finally { //remove resources - kafkaConsumer.close(); + consumer.close(); } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 3b7be2b4736..b17a47cee78 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -25,7 +25,6 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java index 3f16220ad7e..10014831285 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; - import org.apache.kafka.common.TopicPartition; import org.apache.storm.task.TopologyContext; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java similarity index 79% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java index fb709277f5c..5ca70804154 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java @@ -17,12 +17,12 @@ package org.apache.storm.kafka.spout.internal; import java.io.Serializable; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.storm.kafka.spout.KafkaSpoutConfig; /** * This is here to enable testing. */ -public interface KafkaConsumerFactory extends Serializable { - public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig); +public interface ConsumerFactory extends Serializable { + public Consumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java similarity index 91% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java index 9a8142a7990..c3843765fcf 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.storm.kafka.spout.KafkaSpoutConfig; -public class KafkaConsumerFactoryDefault implements KafkaConsumerFactory { +public class ConsumerFactoryDefault implements ConsumerFactory { @Override public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index c9f9541a7df..5bec5b83029 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -17,13 +17,11 @@ package org.apache.storm.kafka.spout.internal; import com.google.common.annotations.VisibleForTesting; - import java.util.Comparator; import java.util.Iterator; import java.util.NavigableSet; import java.util.NoSuchElementException; import java.util.TreeSet; - import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java index 26eb135690b..da84979b017 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java @@ -22,8 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; - -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.metric.api.IMetric; @@ -55,10 +54,10 @@ public class KafkaOffsetMetric implements IMetric { private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); private final Supplier> offsetManagerSupplier; - private final Supplier> consumerSupplier; + private final Supplier> consumerSupplier; public KafkaOffsetMetric(Supplier> offsetManagerSupplier, - Supplier> consumerSupplier) { + Supplier> consumerSupplier) { this.offsetManagerSupplier = offsetManagerSupplier; this.consumerSupplier = consumerSupplier; } @@ -67,9 +66,9 @@ public KafkaOffsetMetric(Supplier> offsetMana public Object getValueAndReset() { Map offsetManagers = offsetManagerSupplier.get(); - KafkaConsumer kafkaConsumer = consumerSupplier.get(); + Consumer consumer = consumerSupplier.get(); - if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) { + if (offsetManagers == null || offsetManagers.isEmpty() || consumer == null) { LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null."); return null; } @@ -77,8 +76,8 @@ public Object getValueAndReset() { Map topicMetricsMap = new HashMap<>(); Set topicPartitions = offsetManagers.keySet(); - Map beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions); - Map endOffsets = kafkaConsumer.endOffsets(topicPartitions); + Map beginningOffsets = consumer.beginningOffsets(topicPartitions); + Map endOffsets = consumer.endOffsets(topicPartitions); //map to hold partition level and topic level metrics Map result = new HashMap<>(); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java index 15912650228..87257d889f8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java @@ -21,7 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -52,7 +52,7 @@ public NamedTopicFilter(String... topics) { } @Override - public Set getAllSubscribedPartitions(KafkaConsumer consumer) { + public Set getAllSubscribedPartitions(Consumer consumer) { Set allPartitions = new HashSet<>(); for (String topic : topics) { List partitionInfoList = consumer.partitionsFor(topic); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java index 554876fe7c8..9ba49dd4284 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java @@ -16,17 +16,14 @@ package org.apache.storm.kafka.spout.subscription; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.TopicPartitionComparator; /** * Filter that returns all partitions for topics matching the given {@link Pattern}. @@ -46,7 +43,7 @@ public PatternTopicFilter(Pattern pattern) { } @Override - public Set getAllSubscribedPartitions(KafkaConsumer consumer) { + public Set getAllSubscribedPartitions(Consumer consumer) { topics.clear(); Set allPartitions = new HashSet<>(); for (Map.Entry> entry : consumer.listTopics().entrySet()) { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java index dcc93ceef07..300adececb4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java @@ -17,10 +17,9 @@ package org.apache.storm.kafka.spout.subscription; import java.io.Serializable; -import java.util.HashSet; import java.util.Set; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; /** @@ -38,7 +37,7 @@ public class TopicAssigner implements Serializable { * @param newAssignment The partitions to assign. * @param listener The rebalance listener to call back on when the assignment changes */ - public void assignPartitions(KafkaConsumer consumer, Set newAssignment, + public void assignPartitions(Consumer consumer, Set newAssignment, ConsumerRebalanceListener listener) { Set currentAssignment = consumer.assignment(); if (!newAssignment.equals(currentAssignment)) { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java index ae2c2549854..6c5941986d0 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java @@ -18,7 +18,7 @@ import java.io.Serializable; import java.util.Set; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; public interface TopicFilter extends Serializable { @@ -28,7 +28,7 @@ public interface TopicFilter extends Serializable { * @param consumer The Kafka consumer to use to read the list of existing partitions * @return The Kafka partitions this set of spouts should subscribe to */ - Set getAllSubscribedPartitions(KafkaConsumer consumer); + Set getAllSubscribedPartitions(Consumer consumer); /** * Get the topics string. diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java new file mode 100644 index 00000000000..b2d79db20fa --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java @@ -0,0 +1,68 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.trident.topology.TransactionAttempt; + +public class KafkaTridentOpaqueSpoutEmitter implements IOpaquePartitionedTridentSpout.Emitter< + List>, + KafkaTridentSpoutTopicPartition, + Map>, + Serializable { + + private static final long serialVersionUID = 1; + private final KafkaTridentSpoutEmitter emitter; + + public KafkaTridentOpaqueSpoutEmitter(KafkaTridentSpoutEmitter emitter) { + this.emitter = emitter; + } + + @Override + public Map emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, + KafkaTridentSpoutTopicPartition partition, Map lastPartitionMeta) { + return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta); + } + + @Override + public void refreshPartitions(List partitionResponsibilities) { + emitter.refreshPartitions(partitionResponsibilities); + } + + @Override + public List getOrderedPartitions(List> allPartitionInfo) { + return emitter.getOrderedPartitions(allPartitionInfo); + } + + @Override + public List getPartitionsForTask(int taskId, int numTasks, + List allPartitionInfoSorted) { + return emitter.getPartitionsForTask(taskId, numTasks, allPartitionInfoSorted); + } + + @Override + public void close() { + emitter.close(); + } + + + +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java index 6e56bb5627d..f0d44b595d3 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java @@ -33,15 +33,17 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class); - private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer(); public static final String FIRST_OFFSET_KEY = "firstOffset"; public static final String LAST_OFFSET_KEY = "lastOffset"; + public static final String TOPOLOGY_ID_KEY = "topologyId"; // first offset of this batch private final long firstOffset; // last offset of this batch private final long lastOffset; + //The unique topology id for the topology that created this metadata + private final String topologyId; /** * Builds a metadata object. @@ -49,9 +51,10 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable { * @param firstOffset The first offset for the batch * @param lastOffset The last offset for the batch */ - public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset) { + public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset, String topologyId) { this.firstOffset = firstOffset; this.lastOffset = lastOffset; + this.topologyId = topologyId; } /** @@ -59,11 +62,12 @@ public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset) { * * @param consumerRecords The non-empty set of records. */ - public KafkaTridentSpoutBatchMetadata(List> consumerRecords) { + public KafkaTridentSpoutBatchMetadata(List> consumerRecords, String topologyId) { Validate.isTrue(!consumerRecords.isEmpty(), "There must be at least one record in order to build metadata"); firstOffset = consumerRecords.get(0).offset(); lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset(); + this.topologyId = topologyId; LOG.debug("Created {}", this.toString()); } @@ -75,6 +79,10 @@ public long getLastOffset() { return lastOffset; } + public String getTopologyId() { + return topologyId; + } + /** * Constructs a metadata object from a Map in the format produced by {@link #toMap() }. * @@ -82,8 +90,11 @@ public long getLastOffset() { * @return A new metadata object */ public static KafkaTridentSpoutBatchMetadata fromMap(Map map) { - return new KafkaTridentSpoutBatchMetadata(((Number) map.get(FIRST_OFFSET_KEY)).longValue(), - ((Number) map.get(LAST_OFFSET_KEY)).longValue()); + return new KafkaTridentSpoutBatchMetadata( + ((Number) map.get(FIRST_OFFSET_KEY)).longValue(), + ((Number) map.get(LAST_OFFSET_KEY)).longValue(), + (String) map.get(TOPOLOGY_ID_KEY) + ); } /** @@ -93,6 +104,7 @@ public Map toMap() { Map map = new HashMap<>(); map.put(FIRST_OFFSET_KEY, firstOffset); map.put(LAST_OFFSET_KEY, lastOffset); + map.put(TOPOLOGY_ID_KEY, topologyId); return map; } @@ -101,6 +113,7 @@ public final String toString() { return "KafkaTridentSpoutBatchMetadata{" + "firstOffset=" + firstOffset + ", lastOffset=" + lastOffset + + ", topologyId=" + topologyId + '}'; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java similarity index 80% rename from external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java rename to external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java index 3aa3a99d3f0..4e46d4c5926 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java @@ -24,26 +24,29 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaTridentSpoutOpaqueCoordinator implements IOpaquePartitionedTridentSpout.Coordinator>>, +public class KafkaTridentSpoutCoordinator implements + IOpaquePartitionedTridentSpout.Coordinator>>, + IPartitionedTridentSpout.Coordinator>>, Serializable { //Initial delay for the assignment refresh timer public static final long TIMER_DELAY_MS = 500; - private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutCoordinator.class); private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); private final KafkaSpoutConfig kafkaSpoutConfig; private final Timer refreshAssignmentTimer; - private final KafkaConsumer consumer; + private final Consumer consumer; private Set partitionsForBatch; @@ -51,11 +54,11 @@ public class KafkaTridentSpoutOpaqueCoordinator implements IOpaquePartition * Creates a new coordinator based on the given spout config. * @param kafkaSpoutConfig The spout config to use */ - public KafkaTridentSpoutOpaqueCoordinator(KafkaSpoutConfig kafkaSpoutConfig) { - this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>()); + public KafkaTridentSpoutCoordinator(KafkaSpoutConfig kafkaSpoutConfig) { + this(kafkaSpoutConfig, new ConsumerFactoryDefault<>()); } - KafkaTridentSpoutOpaqueCoordinator(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory consumerFactory) { + KafkaTridentSpoutCoordinator(KafkaSpoutConfig kafkaSpoutConfig, ConsumerFactory consumerFactory) { this.kafkaSpoutConfig = kafkaSpoutConfig; this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 27e75c23616..22f21c70b61 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -23,6 +23,7 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; +import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -33,40 +34,35 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.RecordTranslator; import org.apache.storm.kafka.spout.TopicPartitionComparator; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTridentSpout.Emitter< - List>, - KafkaTridentSpoutTopicPartition, - Map>, - Serializable { +public class KafkaTridentSpoutEmitter implements Serializable { private static final long serialVersionUID = -7343927794834130435L; private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class); // Kafka - private final KafkaConsumer kafkaConsumer; + private final Consumer consumer; private final KafkaSpoutConfig kafkaSpoutConfig; private final TopicAssigner topicAssigner; - + // The first seek offset for each topic partition, i.e. the offset this spout instance started processing at. - private final Map tpToFirstSeekOffset = new HashMap<>(); + private final Map tpToFirstSeekOffset = new HashMap<>(); private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; @@ -76,17 +72,19 @@ public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTrident /** * Create a new Kafka spout emitter. + * * @param kafkaSpoutConfig The kafka spout config * @param topologyContext The topology context */ public KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext) { - this(kafkaSpoutConfig, topologyContext, new KafkaConsumerFactoryDefault<>(), new TopicAssigner()); + this(kafkaSpoutConfig, topologyContext, new ConsumerFactoryDefault<>(), new TopicAssigner()); } - + + @VisibleForTesting KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, TopologyContext topologyContext, - KafkaConsumerFactory consumerFactory, TopicAssigner topicAssigner) { + ConsumerFactory consumerFactory, TopicAssigner topicAssigner) { this.kafkaSpoutConfig = kafkaSpoutConfig; - this.kafkaConsumer = consumerFactory.createConsumer(kafkaSpoutConfig); + this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig); this.topologyContext = topologyContext; this.translator = kafkaSpoutConfig.getTranslator(); this.topicAssigner = topicAssigner; @@ -95,57 +93,123 @@ public KafkaTridentSpoutEmitter(KafkaSpoutConfig kafkaSpoutConfig, Topolog LOG.debug("Created {}", this.toString()); } - @Override - public Map emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, - KafkaTridentSpoutTopicPartition currBatchPartition, Map lastBatch) { + /** + * Emit a batch that has already been emitted. + */ + public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector, + KafkaTridentSpoutTopicPartition currBatchPartition, Map currBatch) { + + final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); + + throwIfEmittingForUnassignedPartition(currBatchTp); + + KafkaTridentSpoutBatchMetadata currBatchMeta = KafkaTridentSpoutBatchMetadata.fromMap(currBatch); + Collection pausedTopicPartitions = Collections.emptySet(); + + if (!topologyContext.getStormId().equals(currBatchMeta.getTopologyId()) + && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { + LOG.debug("Skipping re-emit of batch that was originally emitted by another topology," + + " because the current first poll offset strategy ignores committed offsets."); + return; + } + + LOG.debug("Re-emitting batch: [transaction= {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", + tx, currBatchPartition, currBatch, collector); + + try { + // pause other topic-partitions to only poll from current topic-partition + pausedTopicPartitions = pauseTopicPartitions(currBatchTp); + + long seekOffset = currBatchMeta.getFirstOffset(); + LOG.debug("Seeking to offset [{}] for topic partition [{}]", seekOffset, currBatchTp); + consumer.seek(currBatchTp, seekOffset); + + final ConsumerRecords records = consumer.poll(pollTimeoutMs); + LOG.debug("Polled [{}] records from Kafka.", records.count()); + + for (ConsumerRecord record : records) { + if (record.offset() == currBatchMeta.getLastOffset() + 1) { + break; + } + if (record.offset() > currBatchMeta.getLastOffset()) { + throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch." + + " The batch end offset was [{%d}], but received [{%d}]." + + " Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", + currBatchMeta.getLastOffset(), record.offset())); + } + emitTuple(collector, record); + } + } finally { + consumer.resume(pausedTopicPartitions); + LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); + } + LOG.debug("Re-emitted batch: [transaction = {}], [currBatchPartition = {}], [currBatchMetadata = {}], " + + "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector); + } + + /** + * Emit a new batch. + */ + public Map emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, + KafkaTridentSpoutTopicPartition currBatchPartition, Map lastBatch) { LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", - tx, currBatchPartition, lastBatch, collector); + tx, currBatchPartition, lastBatch, collector); final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); - final Set assignments = kafkaConsumer.assignment(); + + throwIfEmittingForUnassignedPartition(currBatchTp); + KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch); KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta; Collection pausedTopicPartitions = Collections.emptySet(); - if (!assignments.contains(currBatchPartition.getTopicPartition())) { - throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned." - + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." - + " The current partition is [" + currBatchPartition + "], the assigned partitions are [" + assignments + "]."); - } else { - try { - // pause other topic-partitions to only poll from current topic-partition - pausedTopicPartitions = pauseTopicPartitions(currBatchTp); + try { + // pause other topic-partitions to only poll from current topic-partition + pausedTopicPartitions = pauseTopicPartitions(currBatchTp); - seek(currBatchTp, lastBatchMeta); + seek(currBatchTp, lastBatchMeta); - final ConsumerRecords records = kafkaConsumer.poll(pollTimeoutMs); - LOG.debug("Polled [{}] records from Kafka.", records.count()); + final ConsumerRecords records = consumer.poll(pollTimeoutMs); + LOG.debug("Polled [{}] records from Kafka.", records.count()); - if (!records.isEmpty()) { - emitTuples(collector, records); - // build new metadata - currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp)); + if (!records.isEmpty()) { + for (ConsumerRecord record : records) { + emitTuple(collector, record); } - } finally { - kafkaConsumer.resume(pausedTopicPartitions); - LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); + // build new metadata + currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId()); } - LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " - + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); + } finally { + consumer.resume(pausedTopicPartitions); + LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); } + LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); return currentBatch == null ? null : currentBatch.toMap(); } - private void emitTuples(TridentCollector collector, ConsumerRecords records) { - for (ConsumerRecord record : records) { - final List tuple = translator.apply(record); - collector.emit(tuple); - LOG.debug("Emitted tuple {} for record [{}]", tuple, record); + private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() { + return firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST + || firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; + } + + private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) { + final Set assignments = consumer.assignment(); + if (!assignments.contains(currBatchTp)) { + throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned." + + " This indicates a bug in the TopicFilter or ManualPartitioner implementations." + + " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "]."); } } + private void emitTuple(TridentCollector collector, ConsumerRecord record) { + final List tuple = translator.apply(record); + collector.emit(tuple); + LOG.debug("Emitted tuple {} for record [{}]", tuple, record); + } + /** * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition. * Otherwise the next offset will be one past the last batch, based on lastBatchMeta. @@ -163,37 +227,37 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet if (isFirstPoll(tp)) { if (firstPollOffsetStrategy == EARLIEST) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); - kafkaConsumer.seekToBeginning(Collections.singleton(tp)); + consumer.seekToBeginning(Collections.singleton(tp)); } else if (firstPollOffsetStrategy == LATEST) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); - kafkaConsumer.seekToEnd(Collections.singleton(tp)); + consumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); - kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch + consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp); - kafkaConsumer.seekToBeginning(Collections.singleton(tp)); + consumer.seekToBeginning(Collections.singleton(tp)); } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) { LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp); - kafkaConsumer.seekToEnd(Collections.singleton(tp)); + consumer.seekToEnd(Collections.singleton(tp)); } - tpToFirstSeekOffset.put(tp, kafkaConsumer.position(tp)); + tpToFirstSeekOffset.put(tp, consumer.position(tp)); } else if (lastBatchMeta != null) { - kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch + consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); } else { /* - * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. - * This is either a replay of the first batch for this partition, or all previous batches were empty, - * otherwise last batch meta could not be null. Use the offset the consumer started at. + * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. This is either + * a replay of the first batch for this partition, or all previous batches were empty, otherwise last batch meta could not be + * null. Use the offset the consumer started at. */ long initialFetchOffset = tpToFirstSeekOffset.get(tp); - kafkaConsumer.seek(tp, initialFetchOffset); + consumer.seek(tp, initialFetchOffset); LOG.debug("First poll for topic partition [{}], no last batch metadata present." + " Using stored initial fetch offset [{}]", tp, initialFetchOffset); } - final long fetchOffset = kafkaConsumer.position(tp); + final long fetchOffset = consumer.position(tp); LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp); return fetchOffset; } @@ -204,15 +268,17 @@ private boolean isFirstPoll(TopicPartition tp) { // returns paused topic-partitions. private Collection pauseTopicPartitions(TopicPartition excludedTp) { - final Set pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment()); + final Set pausedTopicPartitions = new HashSet<>(consumer.assignment()); LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions); pausedTopicPartitions.remove(excludedTp); - kafkaConsumer.pause(pausedTopicPartitions); + consumer.pause(pausedTopicPartitions); LOG.debug("Paused topic-partitions {}", pausedTopicPartitions); return pausedTopicPartitions; } - @Override + /** + * Get the input partitions in sorted order. + */ public List getOrderedPartitions(final List> allPartitionInfo) { List sortedPartitions = allPartitionInfo.stream() .map(map -> tpSerializer.fromMap(map)) @@ -220,29 +286,32 @@ public List getOrderedPartitions(final List allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions); LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", - allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); + allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); return allPartitions; } - @Override + /** + * Get the partitions that should be handled by this task. + */ public List getPartitionsForTask(int taskId, int numTasks, List allPartitionInfoSorted) { List tps = allPartitionInfoSorted.stream() .map(kttp -> kttp.getTopicPartition()) .collect(Collectors.toList()); final Set assignedTps = kafkaSpoutConfig.getTopicPartitioner().getPartitionsForThisTask(tps, topologyContext); - LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps); + LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", consumer, taskId, assignedTps); final List taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps); return taskTps; } - - - @Override + + /** + * Prepare the emitter to handle the input partitions. + */ public void refreshPartitions(List partitionResponsibilities) { Set assignedTps = partitionResponsibilities.stream() .map(kttp -> kttp.getTopicPartition()) .collect(Collectors.toSet()); - topicAssigner.assignPartitions(kafkaConsumer, assignedTps, new KafkaSpoutConsumerRebalanceListener()); + topicAssigner.assignPartitions(consumer, assignedTps, new KafkaSpoutConsumerRebalanceListener()); LOG.debug("Assigned partitions [{}] to this task", assignedTps); } @@ -259,33 +328,33 @@ private int getNumTasks() { return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size(); } - @Override public void close() { - kafkaConsumer.close(); + consumer.close(); LOG.debug("Closed"); } @Override public final String toString() { return super.toString() - + "{kafkaSpoutConfig=" + kafkaSpoutConfig - + '}'; + + "{kafkaSpoutConfig=" + kafkaSpoutConfig + + '}'; } - + /** * Just logs reassignments. */ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override public void onPartitionsRevoked(Collection partitions) { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); } @Override public void onPartitionsAssigned(Collection partitions) { LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); } } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java index 3257be799e6..420e6f171ce 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.kafka.spout.trident.internal.OutputFieldsExtractor; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.tuple.Fields; @@ -35,21 +36,26 @@ public class KafkaTridentSpoutOpaque implements IOpaquePartitionedTridentSp private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class); private final KafkaSpoutConfig kafkaSpoutConfig; + private final OutputFieldsExtractor outputFieldsExtractor; + /** + * Creates a new opaque transactional Trident Kafka spout. + */ public KafkaTridentSpoutOpaque(KafkaSpoutConfig kafkaSpoutConfig) { this.kafkaSpoutConfig = kafkaSpoutConfig; + this.outputFieldsExtractor = new OutputFieldsExtractor(); LOG.debug("Created {}", this.toString()); } @Override public Emitter>, KafkaTridentSpoutTopicPartition, Map> getEmitter( Map conf, TopologyContext context) { - return new KafkaTridentSpoutEmitter<>(kafkaSpoutConfig, context); + return new KafkaTridentOpaqueSpoutEmitter<>(new KafkaTridentSpoutEmitter<>(kafkaSpoutConfig, context)); } @Override public Coordinator>> getCoordinator(Map conf, TopologyContext context) { - return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaSpoutConfig); + return new KafkaTridentSpoutCoordinator<>(kafkaSpoutConfig); } @Override @@ -59,13 +65,7 @@ public Map getComponentConfiguration() { @Override public Fields getOutputFields() { - RecordTranslator translator = kafkaSpoutConfig.getTranslator(); - int numStreams = translator.streams().size(); - if (numStreams > 1) { - throw new IllegalStateException("Trident spouts must have at most one output stream," - + " found streams [" + translator.streams() + "]"); - } - final Fields outputFields = translator.getFieldsFor(translator.streams().get(0)); + final Fields outputFields = outputFieldsExtractor.getOutputFields(kafkaSpoutConfig); LOG.debug("OutputFields = {}", outputFields); return outputFields; } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java new file mode 100644 index 00000000000..2d1e9de2d6d --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java @@ -0,0 +1,65 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.trident.internal.OutputFieldsExtractor; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; +import org.apache.storm.tuple.Fields; + +public class KafkaTridentSpoutTransactional implements IPartitionedTridentSpout>, + KafkaTridentSpoutTopicPartition, Map>, + Serializable { + private static final long serialVersionUID = 1L; + + private final KafkaSpoutConfig kafkaSpoutConfig; + private final OutputFieldsExtractor outputFieldsExtractor; + + /** + * Creates a new non-opaque transactional Trident Kafka spout. + */ + public KafkaTridentSpoutTransactional(KafkaSpoutConfig kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; + this.outputFieldsExtractor = new OutputFieldsExtractor(); + } + + @Override + public Coordinator>> getCoordinator(Map conf, TopologyContext context) { + return new KafkaTridentSpoutCoordinator<>(kafkaSpoutConfig); + } + + @Override + public Emitter>, KafkaTridentSpoutTopicPartition, Map> getEmitter( + Map conf, TopologyContext context) { + return new KafkaTridentTransactionalSpoutEmitter<>(new KafkaTridentSpoutEmitter<>(kafkaSpoutConfig, context)); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + return outputFieldsExtractor.getOutputFields(kafkaSpoutConfig); + } + +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentTransactionalSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentTransactionalSpoutEmitter.java new file mode 100644 index 00000000000..58b799fdb01 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentTransactionalSpoutEmitter.java @@ -0,0 +1,68 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; +import org.apache.storm.trident.topology.TransactionAttempt; + +public class KafkaTridentTransactionalSpoutEmitter implements IPartitionedTridentSpout.Emitter< + List>, + KafkaTridentSpoutTopicPartition, + Map>, + Serializable { + + private static final long serialVersionUID = 1; + private final KafkaTridentSpoutEmitter emitter; + + public KafkaTridentTransactionalSpoutEmitter(KafkaTridentSpoutEmitter emitter) { + this.emitter = emitter; + } + + @Override + public List getOrderedPartitions(List> allPartitionInfo) { + return emitter.getOrderedPartitions(allPartitionInfo); + } + + @Override + public Map emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, + KafkaTridentSpoutTopicPartition partition, Map lastPartitionMeta) { + return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta); + } + + @Override + public void refreshPartitions(List partitionResponsibilities) { + emitter.refreshPartitions(partitionResponsibilities); + } + + @Override + public void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, + KafkaTridentSpoutTopicPartition partition, Map partitionMeta) { + emitter.reEmitPartitionBatch(tx, collector, partition, partitionMeta); + } + + @Override + public void close() { + emitter.close(); + } + + + +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java new file mode 100644 index 00000000000..93a2a054244 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident.internal; + +import java.io.Serializable; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.tuple.Fields; + +public class OutputFieldsExtractor implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * Extract the output fields from the config. + * Throws an error if there are multiple declared output streams, since Trident only supports one output stream per spout. + */ + public Fields getOutputFields(KafkaSpoutConfig kafkaSpoutConfig) { + RecordTranslator translator = kafkaSpoutConfig.getTranslator(); + int numStreams = translator.streams().size(); + if (numStreams > 1) { + throw new IllegalStateException("Trident spouts must have at most one output stream," + + " found streams [" + translator.streams() + "]"); + } + return translator.getFieldsFor(translator.streams().get(0)); + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java index 0692c917ea5..f51b159624e 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java @@ -33,8 +33,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.KafkaUnitExtension; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -90,9 +91,9 @@ protected KafkaConsumer getKafkaConsumer() { return consumerSpy; } - private KafkaConsumerFactory createConsumerFactory() { + private ConsumerFactory createConsumerFactory() { - return new KafkaConsumerFactory() { + return new ConsumerFactory() { @Override public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { return consumerSpy; @@ -102,7 +103,7 @@ public KafkaConsumer createConsumer(KafkaSpoutConfig createConsumerSpy() { - return spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + return spy(new ConsumerFactoryDefault().createConsumer(spoutConfig)); } @AfterEach diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index c8f490cc868..7fffb457cf3 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -15,6 +15,13 @@ */ package org.apache.storm.kafka.spout; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -29,12 +36,13 @@ import java.util.List; import java.util.Map; import java.util.Optional; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java index 01ee9e4a97e..11861e2c431 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -15,29 +15,8 @@ */ package org.apache.storm.kafka.spout; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Time.SimulatedTime; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InOrder; -import org.mockito.MockitoAnnotations; - import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.is; - import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -52,8 +31,26 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.subscription.ManualPartitioner; import org.apache.storm.kafka.spout.subscription.TopicFilter; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; public class KafkaSpoutLogCompactionSupportTest { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java index e763e007e4d..27c73722c55 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java @@ -31,15 +31,15 @@ import java.util.HashMap; import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.KafkaUnitExtension; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -65,7 +65,7 @@ public class KafkaSpoutReactivationTest { private final Map conf = new HashMap<>(); private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); private final long commitOffsetPeriodMs = 2_000; - private KafkaConsumer consumerSpy; + private Consumer consumerSpy; private KafkaSpout spout; private final int maxPollRecords = 10; @@ -77,9 +77,9 @@ public void prepareSpout(int messageCount, FirstPollOffsetStrategy firstPollOffs .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .build(); - KafkaConsumerFactory consumerFactory = new KafkaConsumerFactoryDefault<>(); + ConsumerFactory consumerFactory = new ConsumerFactoryDefault<>(); this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); - KafkaConsumerFactory consumerFactoryMock = mock(KafkaConsumerFactory.class); + ConsumerFactory consumerFactoryMock = mock(ConsumerFactory.class); when(consumerFactoryMock.createConsumer(any())) .thenReturn(consumerSpy); this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock, new TopicAssigner()); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index c0c10f8d604..5740b3f10d2 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -15,10 +15,15 @@ */ package org.apache.storm.kafka.spout; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doNothing; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -39,32 +44,19 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicAssigner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; - -import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; - -import java.util.HashSet; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashSet; -import org.apache.storm.kafka.spout.subscription.ManualPartitioner; -import org.apache.storm.kafka.spout.subscription.TopicAssigner; -import org.apache.storm.kafka.spout.subscription.TopicFilter; -import org.junit.Rule; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -81,7 +73,7 @@ public class KafkaSpoutRebalanceTest { private TopologyContext contextMock; private SpoutOutputCollector collectorMock; private KafkaConsumer consumerMock; - private KafkaConsumerFactory consumerFactory; + private ConsumerFactory consumerFactory; private TopicFilter topicFilterMock; private ManualPartitioner partitionerMock; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 0b5c580ecd7..0e5d0b982be 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -15,9 +15,14 @@ */ package org.apache.storm.kafka.spout; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,29 +30,21 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.subscription.ManualPartitioner; +import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; - -import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; - -import org.apache.storm.kafka.spout.subscription.ManualPartitioner; -import org.apache.storm.kafka.spout.subscription.TopicFilter; -import org.junit.Rule; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 646134ad26f..d7febc099d8 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -26,6 +26,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -39,7 +42,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.kafka.KafkaUnitExtension; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -79,7 +82,7 @@ public void setUp() { //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets. assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets)); spout = new KafkaSpout<>(spoutConfig); - new KafkaConsumerFactoryDefault().createConsumer(spoutConfig); + new ConsumerFactoryDefault().createConsumer(spoutConfig); } private void prepareSpout(int msgCount) throws Exception { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java index eba9625ccd7..81c489e57f5 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.Map; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -59,7 +59,7 @@ public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int * that there are only commits on one topic, * and that the committed offset covers messageCount messages */ - public static void verifyAllMessagesCommitted(KafkaConsumer consumerSpy, + public static void verifyAllMessagesCommitted(Consumer consumerSpy, ArgumentCaptor> commitCapture, long messageCount) { verify(consumerSpy, times(1)).commitSync(commitCapture.capture()); Map commits = commitCapture.getValue(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index 2aed18262ef..db92c8421e1 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -41,7 +41,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; import org.apache.storm.kafka.spout.subscription.ManualPartitioner; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.kafka.spout.subscription.TopicFilter; @@ -83,7 +83,7 @@ public static KafkaSpout setupSpout(KafkaSpoutConfig spoutCon }).when(assigner).assignPartitions(any(), any(), any()); when(consumerMock.assignment()).thenReturn(assignedPartitionsSet); - KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + ConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; KafkaSpout spout = new KafkaSpout<>(spoutConfig, consumerFactory, assigner); spout.open(topoConf, contextMock, collectorMock); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java index e348ad38c0f..7f95b744d53 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java @@ -16,6 +16,8 @@ package org.apache.storm.kafka.spout.trident; +import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata; + import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -40,24 +42,28 @@ public void testMetadataIsRoundTripSerializableWithJsonSimple() throws Exception */ long startOffset = 10; long endOffset = 20; + String topologyId = "topologyId"; - KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(startOffset, endOffset); + KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(startOffset, endOffset, topologyId); Map map = metadata.toMap(); Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map)); KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap); assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset())); assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset())); + assertThat(deserializedMetadata.getTopologyId(), is(metadata.getTopologyId())); } @Test public void testCreateMetadataFromRecords() { long firstOffset = 15; long lastOffset = 55; + String topologyId = "topologyId"; List> records = SpoutWithMockedConsumerSetupHelper.createRecords(new TopicPartition("test", 0), firstOffset, (int) (lastOffset - firstOffset + 1)); - KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records); + KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records, topologyId); assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset)); assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset)); + assertThat(metadata.getTopologyId(), is(topologyId)); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java index 3be0488662d..1ccc9a7226e 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java @@ -19,19 +19,19 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -39,24 +39,32 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.ConsumerFactory; import org.apache.storm.kafka.spout.subscription.ManualPartitioner; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.topology.TransactionAttempt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -66,17 +74,27 @@ public class KafkaTridentSpoutEmitterTest { public MockitoRule mockito = MockitoJUnit.rule(); @Captor - public ArgumentCaptor> assignmentCaptor; + public ArgumentCaptor> emitCaptor; + @Mock + public TopologyContext topologyContextMock; + + private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.NONE); private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); + private final String topologyId = "topologyId"; + + @Before + public void setUp() { + when(topologyContextMock.getStormId()).thenReturn(topologyId); + } @Test public void testGetOrderedPartitionsIsConsistent() { KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) .build(), - mock(TopologyContext.class), - mock(KafkaConsumerFactory.class), new TopicAssigner()); + topologyContextMock, + config -> consumer, new TopicAssigner()); Set allPartitions = new HashSet<>(); int numPartitions = 10; @@ -113,8 +131,8 @@ public void testGetPartitionsForTask() { KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1) .build(), - mock(TopologyContext.class), - mock(KafkaConsumerFactory.class), new TopicAssigner()); + topologyContextMock, + config -> consumer, new TopicAssigner()); List allPartitions = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -133,15 +151,13 @@ public void testGetPartitionsForTask() { @Test public void testAssignPartitions() { //Verify correct unwrapping of partitions and delegation of assignment - KafkaConsumer consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory consumerFactory = spoutConfig -> consumerMock; TopicAssigner assignerMock = mock(TopicAssigner.class); KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) .build(), - mock(TopologyContext.class), - consumerFactory, assignerMock); + topologyContextMock, + config -> consumer, assignerMock); List allPartitions = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -153,61 +169,64 @@ public void testAssignPartitions() { emitter.refreshPartitions(allPartitions); - verify(assignerMock).assignPartitions(any(KafkaConsumer.class), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class)); + verify(assignerMock).assignPartitions(eq(consumer), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class)); } - private Map doEmitBatchTest(KafkaConsumer consumerMock, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map previousBatchMeta) { - when(consumerMock.assignment()).thenReturn(Collections.singleton(tp)); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap( - tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords)))); - KafkaConsumerFactory consumerFactory = spoutConfig -> consumerMock; - - KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( + private KafkaTridentSpoutEmitter createEmitterWithMessages(TopicPartition tp, long firstOffset, int numRecords, FirstPollOffsetStrategy firstPollOffsetStrategy) { + consumer.assign(Collections.singleton(tp)); + //Pretend that the topic offsets start at 0, even if the batch should start with a later offset + consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); + List> records = SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords); + records.forEach(record -> consumer.addRecord(record)); + return new KafkaTridentSpoutEmitter<>( SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) - .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) + .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset")) + .setFirstPollOffsetStrategy(firstPollOffsetStrategy) .build(), - mock(TopologyContext.class), - consumerFactory, new TopicAssigner()); + topologyContextMock, + config -> consumer, new TopicAssigner()); + } + + private Map doEmitNewBatchTest(MockConsumer consumer, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map previousBatchMeta) { + KafkaTridentSpoutEmitter emitter = createEmitterWithMessages(tp, firstOffset, numRecords, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST); TransactionAttempt txid = new TransactionAttempt(10L, 0); KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp); - return emitter.emitPartitionBatch(txid, collectorMock, kttp, previousBatchMeta); + return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta); } @Test - public void testEmitBatchWithNullMeta() { + public void testEmitNewBatchWithNullMeta() { //Check that null meta makes the spout seek according to FirstPollOffsetStrategy, and that the returned meta is correct - KafkaConsumer consumerMock = mock(KafkaConsumer.class); TridentCollector collectorMock = mock(TridentCollector.class); TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); long firstOffset = 0; int numRecords = 10; - Map batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, null); + Map batchMeta = doEmitNewBatchTest(consumer, collectorMock, tp, firstOffset, numRecords, null); - InOrder inOrder = inOrder(consumerMock, collectorMock); - inOrder.verify(consumerMock).seekToBeginning(Collections.singleton(tp)); - inOrder.verify(consumerMock).poll(anyLong()); - inOrder.verify(collectorMock, times(numRecords)).emit(anyList()); + verify(collectorMock, times(numRecords)).emit(emitCaptor.capture()); + List> emits = emitCaptor.getAllValues(); + assertThat(emits.get(0).get(0), is(firstOffset)); + assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecords - 1)); KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta); assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1)); } @Test - public void testEmitBatchWithPreviousMeta() { + public void testEmitNewBatchWithPreviousMeta() { //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct - KafkaConsumer consumerMock = mock(KafkaConsumer.class); TridentCollector collectorMock = mock(TridentCollector.class); TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); long firstOffset = 50; int numRecords = 10; - KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1); - Map batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap()); + KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1, topologyId); + Map batchMeta = doEmitNewBatchTest(consumer, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap()); - InOrder inOrder = inOrder(consumerMock, collectorMock); - inOrder.verify(consumerMock).seek(tp, firstOffset); - inOrder.verify(consumerMock).poll(anyLong()); - inOrder.verify(collectorMock, times(numRecords)).emit(anyList()); + verify(collectorMock, times(numRecords)).emit(emitCaptor.capture()); + List> emits = emitCaptor.getAllValues(); + assertThat(emits.get(0).get(0), is(firstOffset)); + assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecords - 1)); KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta); assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1)); @@ -221,7 +240,7 @@ public void testEmitEmptyBatches() throws Exception { TridentCollector collectorMock = mock(TridentCollector.class); TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); when(consumerMock.assignment()).thenReturn(Collections.singleton(tp)); - KafkaConsumerFactory consumerFactory = spoutConfig -> consumerMock; + ConsumerFactory consumerFactory = spoutConfig -> consumerMock; KafkaTridentSpoutEmitter emitter = new KafkaTridentSpoutEmitter<>( SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) @@ -235,7 +254,7 @@ public void testEmitEmptyBatches() throws Exception { clearInvocations(consumerMock); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); TransactionAttempt txid = new TransactionAttempt((long) i, 0); - lastBatchMeta = emitter.emitPartitionBatch(txid, collectorMock, kttp, lastBatchMeta); + lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta); assertThat(lastBatchMeta, nullValue()); if (i == 0) { InOrder inOrder = inOrder(consumerMock, collectorMock); @@ -251,7 +270,7 @@ public void testEmitEmptyBatches() throws Exception { int numRecords = 10; when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap( tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords)))); - lastBatchMeta = emitter.emitPartitionBatch(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta); + lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta); verify(consumerMock).poll(anyLong()); verify(collectorMock, times(numRecords)).emit(anyList()); @@ -259,5 +278,43 @@ public void testEmitEmptyBatches() throws Exception { assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset)); assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1)); } + + @Test + public void testReEmitBatch() { + TridentCollector collectorMock = mock(TridentCollector.class); + TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + long firstOffset = 50; + int numRecordsEmitted = 10; + //Make sure the consumer can return extra records, so we test that re-emit only emits the original messages + int numRecordsPresent = 100; + KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffset, firstOffset + numRecordsEmitted - 1, topologyId); + KafkaTridentSpoutEmitter emitter = createEmitterWithMessages(tp, firstOffset, numRecordsPresent, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST); + TransactionAttempt txid = new TransactionAttempt(10L, 0); + KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp); + emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap()); + + verify(collectorMock, times(numRecordsEmitted)).emit(emitCaptor.capture()); + List> emits = emitCaptor.getAllValues(); + assertThat(emits.get(0).get(0), is(firstOffset)); + assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecordsEmitted - 1)); + } + + @Test + public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() { + //In some cases users will want to drop retrying old batches, e.g. if the topology should start over from scratch. + //If the FirstPollOffsetStrategy ignores committed offsets, we should not retry batches for old topologies + //The batch retry should be skipped entirely + TridentCollector collectorMock = mock(TridentCollector.class); + TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + long firstOffset = 50; + int numRecordsEmitted = 10; + KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffset, firstOffset + numRecordsEmitted - 1, "a new storm id"); + KafkaTridentSpoutEmitter emitter = createEmitterWithMessages(tp, firstOffset, numRecordsEmitted, FirstPollOffsetStrategy.EARLIEST); + TransactionAttempt txid = new TransactionAttempt(10L, 0); + KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp); + emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap()); + + verify(collectorMock, never()).emit(anyList()); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java index 1abe551b5f1..6f46a36815f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java @@ -16,10 +16,11 @@ package org.apache.storm.kafka.spout.trident; + import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -54,7 +55,7 @@ public void testCanGetPartitions() { KafkaSpoutConfig spoutConfig = SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1) .build(); - KafkaTridentSpoutOpaqueCoordinator coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer); + KafkaTridentSpoutCoordinator coordinator = new KafkaTridentSpoutCoordinator<>(spoutConfig, ignored -> mockConsumer); List> partitionsForBatch = coordinator.getPartitionsForBatch(); @@ -82,7 +83,7 @@ public void testCanUpdatePartitions() { KafkaSpoutConfig spoutConfig = SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1) .build(); - KafkaTridentSpoutOpaqueCoordinator coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer); + KafkaTridentSpoutCoordinator coordinator = new KafkaTridentSpoutCoordinator<>(spoutConfig, ignored -> mockConsumer); List> partitionsForBatch = coordinator.getPartitionsForBatch(); @@ -91,7 +92,7 @@ public void testCanUpdatePartitions() { verify(mockFilter).getAllSubscribedPartitions(mockConsumer); assertThat(firstBatchTps, contains(expectedPartition)); - Time.advanceTime(KafkaTridentSpoutOpaqueCoordinator.TIMER_DELAY_MS + spoutConfig.getPartitionRefreshPeriodMs()); + Time.advanceTime(KafkaTridentSpoutCoordinator.TIMER_DELAY_MS + spoutConfig.getPartitionRefreshPeriodMs()); List> partitionsForSecondBatch = coordinator.getPartitionsForBatch();