From 11fcaff0e3f807419db2609bab07e0efb47f8df2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 20 Sep 2016 13:53:55 +0200 Subject: [PATCH] STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout --- external/storm-kafka-client/pom.xml | 10 +- .../apache/storm/kafka/spout/KafkaSpout.java | 46 +++-- .../storm/kafka/spout/KafkaSpoutConfig.java | 8 +- .../kafka/spout/SerializableDeserializer.java | 25 +++ .../spout/internal/KafkaConsumerFactory.java | 27 +++ .../internal/KafkaConsumerFactoryDefault.java | 29 +++ .../kafka/spout/KafkaSpoutRebalanceTest.java | 166 ++++++++++++++++++ .../SingleTopicKafkaSpoutConfiguration.java | 93 ++++++++++ .../builders/TopicKeyValueTupleBuilder.java | 40 +++++ 9 files changed, 427 insertions(+), 17 deletions(-) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index f7a387cb6e0..34b9fdcb2f2 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -57,13 +57,19 @@ org.mockito - mockito-all + mockito-core + test + ${mockito.version} + + + org.hamcrest + hamcrest-all + 1.3 test junit junit - 4.11 test diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4389acb50e5..553408266bb 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -52,6 +52,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; public class KafkaSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); @@ -62,6 +64,7 @@ public class KafkaSpout extends BaseRichSpout { // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; + private final KafkaConsumerFactory kafkaConsumerFactory; private transient KafkaConsumer kafkaConsumer; private transient boolean consumerAutoCommitMode; @@ -84,8 +87,14 @@ public class KafkaSpout extends BaseRichSpout { public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { + this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault()); + } + + //This constructor is here for testing + KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) { this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); + this.kafkaConsumerFactory = kafkaConsumerFactory; } @Override @@ -145,6 +154,10 @@ private void initialize(Collection partitions) { } retryService.retainAll(partitions); + + //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted. + Set partitionsSet = new HashSet(partitions); + emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition())); for (TopicPartition tp : partitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); @@ -286,15 +299,18 @@ private void emitTupleIfNotEmitted(ConsumerRecord record) { LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); - } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried - final List tuple = tuplesBuilder.buildTuple(record); - kafkaSpoutStreams.emit(collector, tuple, msgId); - emitted.add(msgId); - numUncommittedOffsets++; - if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ? - retryService.remove(msgId); // re-emitted hence remove from failed + } else { + boolean isScheduled = retryService.isScheduled(msgId); + if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried + final List tuple = tuplesBuilder.buildTuple(record); + kafkaSpoutStreams.emit(collector, tuple, msgId); + emitted.add(msgId); + numUncommittedOffsets++; + if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule. + retryService.remove(msgId); + } + LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); } - LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); } } @@ -328,6 +344,11 @@ private void commitOffsetsForAckedTuples() { @Override public void ack(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + if(!emitted.contains(msgId)) { + LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); + return; + } + if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically acked.get(msgId.getTopicPartition()).add(msgId); } @@ -339,8 +360,12 @@ public void ack(Object messageId) { @Override public void fail(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; - emitted.remove(msgId); + if(!emitted.contains(msgId)) { + LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); + return; + } if (msgId.numFails() < maxRetries) { + emitted.remove(msgId); msgId.incrementNumFails(); retryService.schedule(msgId); } else { // limit to max number of retries @@ -357,8 +382,7 @@ public void activate() { } private void subscribeKafkaConsumer() { - kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), - kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { final List topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 7c97ac9b04c..8aa525bdc77 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -105,8 +105,8 @@ private Map setDefaultsAndGetKafkaProps(Map kafk public static class Builder { private final Map kafkaProps; - private Deserializer keyDeserializer; - private Deserializer valueDeserializer; + private SerializableDeserializer keyDeserializer; + private SerializableDeserializer valueDeserializer; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; private int maxRetries = DEFAULT_MAX_RETRIES; @@ -164,7 +164,7 @@ public Builder(Map kafkaProps, KafkaSpoutStreams kafkaSpoutStrea /** * Specifying this key deserializer overrides the property key.deserializer */ - public Builder setKeyDeserializer(Deserializer keyDeserializer) { + public Builder setKeyDeserializer(SerializableDeserializer keyDeserializer) { this.keyDeserializer = keyDeserializer; return this; } @@ -172,7 +172,7 @@ public Builder setKeyDeserializer(Deserializer keyDeserializer) { /** * Specifying this value deserializer overrides the property value.deserializer */ - public Builder setValueDeserializer(Deserializer valueDeserializer) { + public Builder setValueDeserializer(SerializableDeserializer valueDeserializer) { this.valueDeserializer = valueDeserializer; return this; } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java new file mode 100644 index 00000000000..eb76a90697e --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java @@ -0,0 +1,25 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * @param The type this deserializer deserializes to. + */ +public interface SerializableDeserializer extends Deserializer, Serializable { +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java new file mode 100644 index 00000000000..0b253b41f89 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import java.io.Serializable; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; + +/** + * This is here to enable testing + */ +public interface KafkaConsumerFactory extends Serializable { + public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig); +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java new file mode 100644 index 00000000000..79003882869 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; + +public class KafkaConsumerFactoryDefault implements KafkaConsumerFactory { + + @Override + public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { + return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java new file mode 100644 index 00000000000..b5428f968a0 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -0,0 +1,166 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.when; +import org.junit.Before; +import org.mockito.Captor; +import static org.mockito.Mockito.reset; +import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class KafkaSpoutRebalanceTest { + + @Captor + private ArgumentCaptor> commitCapture; + + private TopologyContext contextMock; + private SpoutOutputCollector collectorMock; + private Map conf; + private KafkaConsumer consumerMock; + private KafkaConsumerFactory consumerFactoryMock; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + contextMock = mock(TopologyContext.class); + collectorMock = mock(SpoutOutputCollector.class); + conf = new HashMap<>(); + consumerMock = mock(KafkaConsumer.class); + consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock; + } + + //Returns messageIds in order of emission + private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) { + //Setup spout with mock consumer so we can get at the rebalance listener + spout.open(conf, contextMock, collectorMock); + spout.activate(); + + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + + //Assign partitions to the spout + ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); + List assignedPartitions = new ArrayList<>(); + assignedPartitions.add(partitionThatWillBeRevoked); + assignedPartitions.add(assignedPartition); + consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + + //Make the consumer return a single message for each partition + Map>> firstPartitionRecords = new HashMap<>(); + firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value"))); + Map>> secondPartitionRecords = new HashMap<>(); + secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords(firstPartitionRecords)) + .thenReturn(new ConsumerRecords(secondPartitionRecords)) + .thenReturn(new ConsumerRecords(Collections.emptyMap())); + + //Emit the messages + spout.nextTuple(); + ArgumentCaptor messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture()); + reset(collectorMock); + spout.nextTuple(); + ArgumentCaptor messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture()); + + //Now rebalance + consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); + consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition)); + + List emittedMessageIds = new ArrayList<>(); + emittedMessageIds.add(messageIdForRevokedPartition.getValue()); + emittedMessageIds.add(messageIdForAssignedPartition.getValue()); + return emittedMessageIds; + } + + @Test + public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { + //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock); + String topic = SingleTopicKafkaSpoutConfiguration.topic; + TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); + TopicPartition assignedPartition = new TopicPartition(topic, 2); + + //Emit a message on each partition and revoke the first partition + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + + //Ack both emitted tuples + spout.ack(emittedMessageIds.get(0)); + spout.ack(emittedMessageIds.get(1)); + + //Ensure the commit timer has expired + Thread.sleep(510); + + //Make the spout commit any acked tuples + spout.nextTuple(); + //Verify that it only committed the message on the assigned partition + verify(consumerMock).commitSync(commitCapture.capture()); + + Map commitCaptureMap = commitCapture.getValue(); + assertThat(commitCaptureMap, hasKey(assignedPartition)); + assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + } + + @Test + public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { + //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass + KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock); + String topic = SingleTopicKafkaSpoutConfiguration.topic; + TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); + TopicPartition assignedPartition = new TopicPartition(topic, 2); + + //Emit a message on each partition and revoke the first partition + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + + //Fail both emitted tuples + spout.fail(emittedMessageIds.get(0)); + spout.fail(emittedMessageIds.get(1)); + + //Check that only the tuple on the currently assigned partition is retried + verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); + verify(retryServiceMock).schedule(emittedMessageIds.get(1)); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java new file mode 100644 index 00000000000..157543cc216 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.builders; + +import org.apache.storm.Config; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.*; +import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +public class SingleTopicKafkaSpoutConfiguration { + public static final String stream = "test_stream"; + public static final String topic = "test"; + + static public Config getConfig() { + Config config = new Config(); + config.setDebug(true); + return config; + } + + static public StormTopology getTopologyKafkaSpout(int port) { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", stream); + return tp.createTopology(); + } + + static public KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) { + return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000); + } + + static public KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) { + return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService()); + } + + static public KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) { + return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .setPollTimeoutMs(1000) + .build(); + } + + static protected KafkaSpoutRetryService getRetryService() { + return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0), + KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + } + + static protected Map getKafkaConsumerProps(int port) { + Map props = new HashMap<>(); + props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port); + props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); + props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("max.poll.records", "5"); + return props; + } + + static protected KafkaSpoutTuplesBuilder getTuplesBuilder() { + return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( + new TopicKeyValueTupleBuilder(topic)) + .build(); + } + + static public KafkaSpoutStreams getKafkaSpoutStreams() { + final Fields outputFields = new Fields("topic", "key", "value"); + return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, stream, new String[]{topic}) // contents of topics test sent to test_stream + .build(); + } +} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java new file mode 100644 index 00000000000..d6b8a2ab88f --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.builders; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.tuple.Values; + +import java.util.List; + +public class TopicKeyValueTupleBuilder extends KafkaSpoutTupleBuilder { + /** + * @param topics list of topics that use this implementation to build tuples + */ + public TopicKeyValueTupleBuilder(String... topics) { + super(topics); + } + + @Override + public List buildTuple(ConsumerRecord consumerRecord) { + return new Values(consumerRecord.topic(), + consumerRecord.key(), + consumerRecord.value()); + } +} \ No newline at end of file