diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 929c8ea48b7..a7d7b23544c 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -151,26 +151,12 @@ org.apache.kafka - kafka_2.10 - 0.8.2.1 - + ${kafka.artifact.id} provided - - - org.apache.zookeeper - zookeeper - - - log4j - log4j - - org.apache.kafka kafka-clients - 0.8.2.1 - provided org.apache.storm diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml index 28d72392489..48cc1519057 100644 --- a/external/flux/flux-examples/pom.xml +++ b/external/flux/flux-examples/pom.xml @@ -95,18 +95,7 @@ org.apache.kafka - kafka_2.10 - 0.8.1.1 - - - org.apache.zookeeper - zookeeper - - - log4j - log4j - - + ${kafka.artifact.id} diff --git a/external/flux/pom.xml b/external/flux/pom.xml index 1fd1683c6ed..56d9babc076 100644 --- a/external/flux/pom.xml +++ b/external/flux/pom.xml @@ -78,19 +78,8 @@ org.apache.kafka - kafka_2.10 - 0.8.1.1 + ${kafka.artifact.id} test - - - org.apache.zookeeper - zookeeper - - - log4j - log4j - - junit diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml index 450611ec837..0642d179d0b 100644 --- a/external/sql/storm-sql-kafka/pom.xml +++ b/external/sql/storm-sql-kafka/pom.xml @@ -63,26 +63,12 @@ org.apache.kafka - kafka_2.10 - 0.8.2.1 - + ${kafka.artifact.id} provided - - - org.apache.zookeeper - zookeeper - - - log4j - log4j - - org.apache.kafka kafka-clients - 0.8.2.1 - provided com.fasterxml.jackson.core diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md new file mode 100644 index 00000000000..8ac15f524ba --- /dev/null +++ b/external/storm-kafka-client/README.md @@ -0,0 +1,9 @@ +#Storm Kafka Spout New Consumer API + +This patch is still under development and it comes with no warranties at this moment. + +It has not been thoroughly tested, and therefore there may be some bugs and it is not ready for production. + +The documentation will be uploaded soon. + +To see how to use the new Kafka Spout, please refer to the example under tests. Thank you! \ No newline at end of file diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml new file mode 100644 index 00000000000..6c82b6a57b7 --- /dev/null +++ b/external/storm-kafka-client/pom.xml @@ -0,0 +1,86 @@ + + + + 4.0.0 + + + storm + org.apache.storm + 2.0.0-SNAPSHOT + ../../pom.xml + + + storm-kafka-client + storm-kafka-client + + jar + + + + hmcl + Hugo Louro + hmclouro@gmail.com + + + + + + + org.apache.storm + storm-core + ${project.version} + provided + + + + org.apache.kafka + kafka-clients + + + + org.mockito + mockito-all + test + + + junit + junit + 4.11 + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.5 + + + + test-jar + + + + + + + diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java new file mode 100644 index 00000000000..d211ae915a1 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + protected SpoutOutputCollector collector; + + // Kafka + private final KafkaSpoutConfig kafkaSpoutConfig; + private transient KafkaConsumer kafkaConsumer; + private transient boolean consumerAutoCommitMode; + + + // Bookkeeping + private transient int maxRetries; // Max number of times a tuple is retried + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation + private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure + private transient Timer commitTimer; // timer == null for auto commit mode + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() + + private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples + private transient KafkaSpoutTuplesBuilder tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord + + private transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate + private transient Set emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed + private transient Iterator> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + + + public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + initialized = false; + + // Spout internals + this.collector = collector; + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + numUncommittedOffsets = 0; + + // Offset management + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + + // Retries management + retryService = kafkaSpoutConfig.getRetryService(); + + // Tuples builder delegate + tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder(); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + } + + acked = new HashMap<>(); + emitted = new HashSet<>(); + waitingToEmit = Collections.emptyListIterator(); + + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); + } + + // =========== Consumer Rebalance Listener - On the same thread as the caller =========== + + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection partitions) { + LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + if (!consumerAutoCommitMode && initialized) { + initialized = false; + commitOffsetsForAckedTuples(); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + + initialize(partitions); + } + + private void initialize(Collection partitions) { + if (!consumerAutoCommitMode) { + acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout + } + + retryService.retainAll(partitions); + + for (TopicPartition tp : partitions) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); + final long fetchOffset = doSeek(tp, committedOffset); + setAcked(tp, fetchOffset); + } + initialized = true; + LOG.debug("Initialization complete"); + } + + /** + * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + */ + private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { + long fetchOffset; + if (committedOffset != null) { // offset was committed for this TopicPartition + if (firstPollOffsetStrategy.equals(EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + fetchOffset = kafkaConsumer.position(tp); + } else if (firstPollOffsetStrategy.equals(LATEST)) { + kafkaConsumer.seekToEnd(tp); + fetchOffset = kafkaConsumer.position(tp); + } else { + // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. + fetchOffset = committedOffset.offset() + 1; + kafkaConsumer.seek(tp, fetchOffset); + } + } else { // no commits have ever been done, so start at the beginning or end depending on the strategy + if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { + kafkaConsumer.seekToEnd(tp); + } + fetchOffset = kafkaConsumer.position(tp); + } + return fetchOffset; + } + } + + private void setAcked(TopicPartition tp, long fetchOffset) { + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (!consumerAutoCommitMode && !acked.containsKey(tp)) { + acked.put(tp, new OffsetEntry(tp, fetchOffset)); + } + } + + // ======== Next Tuple ======= + + @Override + public void nextTuple() { + if (initialized) { + if (commit()) { + commitOffsetsForAckedTuples(); + } + + if (poll()) { + setWaitingToEmit(pollKafkaBroker()); + } + + if (waitingToEmit()) { + emit(); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } + } + + private boolean commit() { + return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + } + + private boolean poll() { + return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); + } + + private boolean waitingToEmit() { + return waitingToEmit != null && waitingToEmit.hasNext(); + } + + public void setWaitingToEmit(ConsumerRecords consumerRecords) { + List> waitingToEmitList = new LinkedList<>(); + for (TopicPartition tp : consumerRecords.partitions()) { + waitingToEmitList.addAll(consumerRecords.records(tp)); + } + waitingToEmit = waitingToEmitList.iterator(); + LOG.trace("Records waiting to be emitted {}", waitingToEmitList); + } + + // ======== poll ========= + private ConsumerRecords pollKafkaBroker() { + doSeekRetriableTopicPartitions(); + + final ConsumerRecords consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final int numPolledRecords = consumerRecords.count(); + LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets); + return consumerRecords; + } + + private void doSeekRetriableTopicPartitions() { + final Set retriableTopicPartitions = retryService.retriableTopicPartitions(); + + for (TopicPartition rtp : retriableTopicPartitions) { + final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); + if (offsetAndMeta != null) { + kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle + } else { + kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset + } + } + } + + // ======== emit ========= + private void emit() { + emitTupleIfNotEmitted(waitingToEmit.next()); + waitingToEmit.remove(); + } + + // emits one tuple per record + private void emitTupleIfNotEmitted(ConsumerRecord record) { + final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); + + if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked + LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); + } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail + LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); + } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried + final List tuple = tuplesBuilder.buildTuple(record); + kafkaSpoutStreams.emit(collector, tuple, msgId); + emitted.add(msgId); + numUncommittedOffsets++; + if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ? + retryService.remove(msgId); // re-emitted hence remove from failed + } + LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); + } + } + + private void commitOffsetsForAckedTuples() { + // Find offsets that are ready to be committed for every topic partition + final Map nextCommitOffsets = new HashMap<>(); + for (Map.Entry tpOffset : acked.entrySet()) { + final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(); + if (nextCommitOffset != null) { + nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); + } + } + + // Commit offsets that are ready to be committed for every topic partition + if (!nextCommitOffsets.isEmpty()) { + kafkaConsumer.commitSync(nextCommitOffsets); + LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); + // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition + // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop + for (Map.Entry tpOffset : acked.entrySet()) { + final OffsetEntry offsetEntry = tpOffset.getValue(); + offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey())); + } + } else { + LOG.trace("No offsets to commit. {}", this); + } + } + + // ======== Ack ======= + + @Override + public void ack(Object messageId) { + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically + acked.get(msgId.getTopicPartition()).add(msgId); + LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked); + } + emitted.remove(msgId); + } + + // ======== Fail ======= + + @Override + public void fail(Object messageId) { + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + emitted.remove(msgId); + if (msgId.numFails() < maxRetries) { + msgId.incrementNumFails(); + retryService.schedule(msgId); + } else { // limit to max number of retries + LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); + ack(msgId); + } + } + + // ======== Activate / Deactivate / Close / Declare Outputs ======= + + @Override + public void activate() { + subscribeKafkaConsumer(); + } + + private void subscribeKafkaConsumer() { + kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener()); + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + kafkaConsumer.poll(0); + } + + @Override + public void deactivate() { + shutdown(); + } + + @Override + public void close() { + shutdown(); + } + + private void shutdown() { + try { + kafkaConsumer.wakeup(); + if (!consumerAutoCommitMode) { + commitOffsetsForAckedTuples(); + } + } finally { + //remove resources + kafkaConsumer.close(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + kafkaSpoutStreams.declareOutputFields(declarer); + } + + @Override + public String toString() { + return "{acked=" + acked + "} "; + } + + // ======= Offsets Commit Management ========== + + private static class OffsetComparator implements Comparator { + public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { + return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; + } + } + + /** + * This class is not thread safe + */ + private class OffsetEntry { + private final TopicPartition tp; + private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. + * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ + private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 + private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset + + public OffsetEntry(TopicPartition tp, long initialFetchOffset) { + this.tp = tp; + this.initialFetchOffset = initialFetchOffset; + this.committedOffset = initialFetchOffset - 1; + LOG.debug("Instantiated {}", this); + } + + public void add(KafkaSpoutMessageId msgId) { // O(Log N) + ackedMsgs.add(msgId); + } + + /** + * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit. + */ + public OffsetAndMetadata findNextCommitOffset() { + boolean found = false; + long currOffset; + long nextCommitOffset = committedOffset; + KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata + + for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap + if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit + found = true; + nextCommitMsg = currAckedMsg; + nextCommitOffset = currOffset; + LOG.trace("Found offset to commit [{}]. {}", currOffset, this); + } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search + LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this); + break; + } else { + LOG.debug("Unexpected offset found [{}]. {}", currOffset, this); + break; + } + } + + OffsetAndMetadata nextCommitOffsetAndMetadata = null; + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this); + } else { + LOG.debug("No offsets ready to commit. {}", this); + } + return nextCommitOffsetAndMetadata; + } + + /** + * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future + * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any. + * + * @param committedOffset offset to be marked as committed + */ + public void commit(OffsetAndMetadata committedOffset) { + if (committedOffset != null) { + final long numCommittedOffsets = committedOffset.offset() - this.committedOffset; + this.committedOffset = committedOffset.offset(); + for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext(); ) { + if (iterator.next().offset() <= committedOffset.offset()) { + iterator.remove(); + } else { + break; + } + } + numUncommittedOffsets-= numCommittedOffsets; + } + LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets); + } + + public boolean isEmpty() { + return ackedMsgs.isEmpty(); + } + + public boolean contains(ConsumerRecord record) { + return contains(new KafkaSpoutMessageId(record)); + } + + public boolean contains(KafkaSpoutMessageId msgId) { + return ackedMsgs.contains(msgId); + } + + @Override + public String toString() { + return "OffsetEntry{" + + "topic-partition=" + tp + + ", fetchOffset=" + initialFetchOffset + + ", committedOffset=" + committedOffset + + ", ackedMsgs=" + ackedMsgs + + '}'; + } + } + + // =========== Timer =========== + + private class Timer { + private final long delay; + private final long period; + private final TimeUnit timeUnit; + private final long periodNanos; + private long start; + + /** + * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link + * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns + * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay. + * + * @param delay the initial delay before the timer starts + * @param period the period between calls {@link #isExpiredResetOnTrue()} + * @param timeUnit the time unit of delay and period + */ + public Timer(long delay, long period, TimeUnit timeUnit) { + this.delay = delay; + this.period = period; + this.timeUnit = timeUnit; + + periodNanos = timeUnit.toNanos(period); + start = System.nanoTime() + timeUnit.toNanos(delay); + } + + public long period() { + return period; + } + + public long delay() { + return delay; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + /** + * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the + * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset + * (re-initiated) and a new cycle will start. + * + * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false + * otherwise. + */ + public boolean isExpiredResetOnTrue() { + final boolean expired = System.nanoTime() - start > periodNanos; + if (expired) { + start = System.nanoTime(); + } + return expired; + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java new file mode 100644 index 00000000000..29cedb2247a --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics + */ +public class KafkaSpoutConfig implements Serializable { + public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000; // 2s + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000; // 15s + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000; // 10,000 records + + // Kafka property names + public interface Consumer { + String GROUP_ID = "group.id"; + String BOOTSTRAP_SERVERS = "bootstrap.servers"; + String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + String KEY_DESERIALIZER = "key.deserializer"; + String VALUE_DESERIALIZER = "value.deserializer"; + } + + /** + * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will + * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST.

+ * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST.
+ *
    + *
  • EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits
  • + *
  • LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits
  • + *
  • UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as EARLIEST.
  • + *
  • UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as LATEST.
  • + *
+ * */ + public enum FirstPollOffsetStrategy { + EARLIEST, + LATEST, + UNCOMMITTED_EARLIEST, + UNCOMMITTED_LATEST } + + // Kafka consumer configuration + private final Map kafkaProps; + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; + private final long pollTimeoutMs; + + // Kafka spout configuration + private final long offsetCommitPeriodMs; + private final int maxRetries; + private final int maxUncommittedOffsets; + private final FirstPollOffsetStrategy firstPollOffsetStrategy; + private final KafkaSpoutStreams kafkaSpoutStreams; + private final KafkaSpoutTuplesBuilder tuplesBuilder; + private final KafkaSpoutRetryService retryService; + + private KafkaSpoutConfig(Builder builder) { + this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); + this.keyDeserializer = builder.keyDeserializer; + this.valueDeserializer = builder.valueDeserializer; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; + this.maxRetries = builder.maxRetries; + this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; + this.kafkaSpoutStreams = builder.kafkaSpoutStreams; + this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + this.tuplesBuilder = builder.tuplesBuilder; + this.retryService = builder.retryService; + } + + private Map setDefaultsAndGetKafkaProps(Map kafkaProps) { + // set defaults for properties not specified + if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { + kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); + } + return kafkaProps; + } + + public static class Builder { + private final Map kafkaProps; + private Deserializer keyDeserializer; + private Deserializer valueDeserializer; + private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; + private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; + private int maxRetries = DEFAULT_MAX_RETRIES; + private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; + private final KafkaSpoutStreams kafkaSpoutStreams; + private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; + private final KafkaSpoutTuplesBuilder tuplesBuilder; + private final KafkaSpoutRetryService retryService; + + /** + * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.

+ * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:

+ * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))} + */ + public Builder(Map kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, + KafkaSpoutTuplesBuilder tuplesBuilder) { + this(kafkaProps, kafkaSpoutStreams, tuplesBuilder, + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + DEFAULT_MAX_RETRIES, TimeInterval.seconds(10))); + } + + /*** + * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics + * The optional configuration can be specified using the set methods of this builder + * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see KafkaConsumer + * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream. + * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s. + * @param retryService logic that manages the retrial of failed tuples + */ + public Builder(Map kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, + KafkaSpoutTuplesBuilder tuplesBuilder, KafkaSpoutRetryService retryService) { + if (kafkaProps == null || kafkaProps.isEmpty()) { + throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps); + } + + if (kafkaSpoutStreams == null) { + throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream"); + } + + if (tuplesBuilder == null) { + throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); + } + + if (retryService == null) { + throw new IllegalArgumentException("Must specify at implementation of retry service"); + } + + this.kafkaProps = kafkaProps; + this.kafkaSpoutStreams = kafkaSpoutStreams; + this.tuplesBuilder = tuplesBuilder; + this.retryService = retryService; + } + + /** + * Specifying this key deserializer overrides the property key.deserializer + */ + public Builder setKeyDeserializer(Deserializer keyDeserializer) { + this.keyDeserializer = keyDeserializer; + return this; + } + + /** + * Specifying this value deserializer overrides the property value.deserializer + */ + public Builder setValueDeserializer(Deserializer valueDeserializer) { + this.valueDeserializer = valueDeserializer; + return this; + } + + /** + * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s + * @param pollTimeoutMs time in ms + */ + public Builder setPollTimeoutMs(long pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + return this; + } + + /** + * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. + * @param offsetCommitPeriodMs time in ms + */ + public Builder setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { + this.offsetCommitPeriodMs = offsetCommitPeriodMs; + return this; + } + + /** + * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that + * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of + * all the previously polled records. + * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous + * polled records in favor of processing more records. + * @param maxRetries max number of retrials + */ + public Builder setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. + * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number + * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. + * @param maxUncommittedOffsets max number of records that can be be pending commit + */ + public Builder setMaxUncommittedOffsets(int maxUncommittedOffsets) { + this.maxUncommittedOffsets = maxUncommittedOffsets; + return this; + } + + /** + * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. + * Please refer to to the documentation in {@link FirstPollOffsetStrategy} + * @param firstPollOffsetStrategy Offset used by Kafka spout first poll + * */ + public Builder setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { + this.firstPollOffsetStrategy = firstPollOffsetStrategy; + return this; + } + + public KafkaSpoutConfig build() { + return new KafkaSpoutConfig<>(this); + } + } + + public Map getKafkaProps() { + return kafkaProps; + } + + public Deserializer getKeyDeserializer() { + return keyDeserializer; + } + + public Deserializer getValueDeserializer() { + return valueDeserializer; + } + + public long getPollTimeoutMs() { + return pollTimeoutMs; + } + + public long getOffsetsCommitPeriodMs() { + return offsetCommitPeriodMs; + } + + public boolean isConsumerAutoCommitMode() { + return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true + || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT)); + } + + public String getConsumerGroupId() { + return (String) kafkaProps.get(Consumer.GROUP_ID); + } + + /** + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + */ + public List getSubscribedTopics() { + return new ArrayList<>(kafkaSpoutStreams.getTopics()); + } + + public int getMaxTupleRetries() { + return maxRetries; + } + + public FirstPollOffsetStrategy getFirstPollOffsetStrategy() { + return firstPollOffsetStrategy; + } + + public KafkaSpoutStreams getKafkaSpoutStreams() { + return kafkaSpoutStreams; + } + + public int getMaxUncommittedOffsets() { + return maxUncommittedOffsets; + } + + public KafkaSpoutTuplesBuilder getTuplesBuilder() { + return tuplesBuilder; + } + + public KafkaSpoutRetryService getRetryService() { + return retryService; + } + + @Override + public String toString() { + return "KafkaSpoutConfig{" + + "kafkaProps=" + kafkaProps + + ", keyDeserializer=" + keyDeserializer + + ", valueDeserializer=" + valueDeserializer + + ", topics=" + getSubscribedTopics() + + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + + ", pollTimeoutMs=" + pollTimeoutMs + + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + + ", maxRetries=" + maxRetries + + '}'; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java new file mode 100644 index 00000000000..71f83276059 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +public class KafkaSpoutMessageId { + private transient TopicPartition topicPart; + private transient long offset; + private transient int numFails = 0; + + public KafkaSpoutMessageId(ConsumerRecord consumerRecord) { + this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()); + } + + public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { + this.topicPart = topicPart; + this.offset = offset; + } + + public int partition() { + return topicPart.partition(); + } + + public String topic() { + return topicPart.topic(); + } + + public long offset() { + return offset; + } + + public int numFails() { + return numFails; + } + + public void incrementNumFails() { + ++numFails; + } + + public TopicPartition getTopicPartition() { + return topicPart; + } + + public String getMetadata(Thread currThread) { + return "{" + + "topic-partition=" + topicPart + + ", offset=" + offset + + ", numFails=" + numFails + + ", thread='" + currThread.getName() + "'" + + '}'; + } + + @Override + public String toString() { + return "{" + + "topic-partition=" + topicPart + + ", offset=" + offset + + ", numFails=" + numFails + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaSpoutMessageId messageId = (KafkaSpoutMessageId) o; + if (offset != messageId.offset) { + return false; + } + return topicPart.equals(messageId.topicPart); + } + + @Override + public int hashCode() { + int result = topicPart.hashCode(); + result = 31 * result + (int) (offset ^ (offset >>> 32)); + return result; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java new file mode 100644 index 00000000000..208cef4fb99 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows: + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = Min(nextRetry, currentTime + maxDelay) + */ +public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class); + private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator(); + + private TimeInterval initialDelay; + private TimeInterval delayPeriod; + private TimeInterval maxDelay; + private int maxRetries; + + private Set retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); + private Set toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups + + /** + * Comparator ordering by timestamp + */ + private static class RetryEntryTimeStampComparator implements Serializable, Comparator { + public int compare(RetrySchedule entry1, RetrySchedule entry2) { + return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos()); + } + } + + private class RetrySchedule { + private KafkaSpoutMessageId msgId; + private long nextRetryTimeNanos; + + public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) { + this.msgId = msgId; + this.nextRetryTimeNanos = nextRetryTime; + LOG.debug("Created {}", this); + } + + public void setNextRetryTime() { + nextRetryTimeNanos = nextTime(msgId); + LOG.debug("Updated {}", this); + } + + public boolean retry(long currentTimeNanos) { + return nextRetryTimeNanos <= currentTimeNanos; + } + + @Override + public String toString() { + return "RetrySchedule{" + + "msgId=" + msgId + + ", nextRetryTime=" + nextRetryTimeNanos + + '}'; + } + + public KafkaSpoutMessageId msgId() { + return msgId; + } + + public long nextRetryTimeNanos() { + return nextRetryTimeNanos; + } + } + + public static class TimeInterval implements Serializable { + private long lengthNanos; + private long length; + private TimeUnit timeUnit; + + /** + * @param length length of the time interval in the units specified by {@link TimeUnit} + * @param timeUnit unit used to specify a time interval on which to specify a time unit + */ + public TimeInterval(long length, TimeUnit timeUnit) { + this.length = length; + this.timeUnit = timeUnit; + this.lengthNanos = timeUnit.toNanos(length); + } + + public static TimeInterval seconds(long length) { + return new TimeInterval(length, TimeUnit.SECONDS); + } + + public static TimeInterval milliSeconds(long length) { + return new TimeInterval(length, TimeUnit.MILLISECONDS); + } + + public static TimeInterval microSeconds(long length) { + return new TimeInterval(length, TimeUnit.MILLISECONDS); + } + + public long lengthNanos() { + return lengthNanos; + } + + public long length() { + return length; + } + + public TimeUnit timeUnit() { + return timeUnit; + } + + @Override + public String toString() { + return "TimeInterval{" + + "length=" + length + + ", timeUnit=" + timeUnit + + '}'; + } + } + + /** + * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression): + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = Min(nextRetry, currentTime + maxDelay) + * + * @param initialDelay initial delay of the first retry + * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression) + * @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit + * @param maxDelay maximum amount of time waiting before retrying + * + */ + public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) { + this.initialDelay = initialDelay; + this.delayPeriod = delayPeriod; + this.maxRetries = maxRetries; + this.maxDelay = maxDelay; + LOG.debug("Instantiated {}", this); + } + + @Override + public Set retriableTopicPartitions() { + final Set tps = new TreeSet<>(); + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + final KafkaSpoutMessageId msgId = retrySchedule.msgId; + tps.add(new TopicPartition(msgId.topic(), msgId.partition())); + } else { + break; // Stop searching as soon as passed current time + } + } + LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps); + return tps; + } + + @Override + public boolean isReady(KafkaSpoutMessageId msgId) { + boolean retry = false; + if (toRetryMsgs.contains(msgId)) { + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + if (retrySchedule.msgId.equals(msgId)) { + retry = true; + LOG.debug("Found entry to retry {}", retrySchedule); + } + } else { + LOG.debug("Entry to retry not found {}", retrySchedule); + break; // Stop searching as soon as passed current time + } + } + } + return retry; + } + + @Override + public boolean isScheduled(KafkaSpoutMessageId msgId) { + return toRetryMsgs.contains(msgId); + } + + @Override + public boolean remove(KafkaSpoutMessageId msgId) { + boolean removed = false; + if (toRetryMsgs.contains(msgId)) { + for (Iterator iterator = retrySchedules.iterator(); iterator.hasNext(); ) { + final RetrySchedule retrySchedule = iterator.next(); + if (retrySchedule.msgId().equals(msgId)) { + iterator.remove(); + toRetryMsgs.remove(msgId); + removed = true; + break; + } + } + } + LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId); + LOG.trace("Current state {}", retrySchedules); + return removed; + } + + @Override + public boolean retainAll(Collection topicPartitions) { + boolean result = false; + for (Iterator rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) { + final RetrySchedule retrySchedule = rsIterator.next(); + final KafkaSpoutMessageId msgId = retrySchedule.msgId; + final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition()); + if (!topicPartitions.contains(tpRetry)) { + rsIterator.remove(); + toRetryMsgs.remove(msgId); + LOG.debug("Removed {}", retrySchedule); + LOG.trace("Current state {}", retrySchedules); + result = true; + } + } + return result; + } + + @Override + public void schedule(KafkaSpoutMessageId msgId) { + if (msgId.numFails() > maxRetries) { + LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); + } else { + if (toRetryMsgs.contains(msgId)) { + for (Iterator iterator = retrySchedules.iterator(); iterator.hasNext(); ) { + final RetrySchedule retrySchedule = iterator.next(); + if (retrySchedule.msgId().equals(msgId)) { + iterator.remove(); + toRetryMsgs.remove(msgId); + } + } + } + final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); + retrySchedules.add(retrySchedule); + toRetryMsgs.add(msgId); + LOG.debug("Scheduled. {}", retrySchedule); + LOG.trace("Current state {}", retrySchedules); + } + } + + // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE + private long nextTime(KafkaSpoutMessageId msgId) { + final long currentTimeNanos = System.nanoTime(); + final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... + ? currentTimeNanos + initialDelay.lengthNanos() + : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1)); + return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos); + } + + @Override + public String toString() { + return "KafkaSpoutRetryExponentialBackoff{" + + "delay=" + initialDelay + + ", ratio=" + delayPeriod + + ", maxRetries=" + maxRetries + + ", maxRetryDelay=" + maxDelay + + '}'; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java new file mode 100644 index 00000000000..5aab1678de1 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; + +/** + * Represents the logic that manages the retrial of failed tuples. + */ +public interface KafkaSpoutRetryService extends Serializable { + /** + * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled. + * @param msgId message to schedule for retrial + */ + void schedule(KafkaSpoutMessageId msgId); + + /** + * Removes a message from the list of messages scheduled for retrial + * @param msgId message to remove from retrial + */ + boolean remove(KafkaSpoutMessageId msgId); + + /** + * Retains all the messages whose {@link TopicPartition} belongs to the specified {@code Collection}. + * All messages that come from a {@link TopicPartition} NOT existing in the collection will be removed. + * This method is useful to cleanup state following partition rebalance. + * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages + * @return true if at least one message was removed, false otherwise + */ + boolean retainAll(Collection topicPartitions); + + /** + * @return set of topic partitions that have offsets that are ready to be retried, i.e., + * for which a tuple has failed and has retry time less than current time + */ + Set retriableTopicPartitions(); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried, + * i.e is scheduled and has retry time that is less than current time. + * @return true if message is ready to be retried, false otherwise + */ + boolean isReady(KafkaSpoutMessageId msgId); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried. + * The message may or may not be ready to be retried yet. + * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried. + * Returns false is this message is not scheduled for retrial + */ + boolean isScheduled(KafkaSpoutMessageId msgId); +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java new file mode 100644 index 00000000000..064a8bbafbb --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + +import java.io.Serializable; + +/** + * Represents the stream and output fields used by a topic + */ +public class KafkaSpoutStream implements Serializable { + private final Fields outputFields; + private final String streamId; + private final String topic; + + /** Represents the specified outputFields and topic with the default stream */ + KafkaSpoutStream(Fields outputFields, String topic) { + this(outputFields, Utils.DEFAULT_STREAM_ID, topic); + } + + /** Represents the specified outputFields and topic with the specified stream */ + KafkaSpoutStream(Fields outputFields, String streamId, String topic) { + if (outputFields == null || streamId == null || topic == null) { + throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + + "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic)); + } + this.outputFields = outputFields; + this.streamId = streamId; + this.topic = topic; + } + + Fields getOutputFields() { + return outputFields; + } + + String getStreamId() { + return streamId; + } + + String getTopic() { + return topic; + } + + @Override + public String toString() { + return "KafkaSpoutStream{" + + "outputFields=" + outputFields + + ", streamId='" + streamId + '\'' + + ", topic='" + topic + '\'' + + '}'; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java new file mode 100644 index 00000000000..dc5892e038c --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.OutputFieldsGetter; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to + * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. + */ +public class KafkaSpoutStreams implements Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreams.class); + + private final Map topicToStream; + + private KafkaSpoutStreams(Builder builder) { + this.topicToStream = builder.topicToStream; + LOG.debug("Built {}", this); + } + + /** + * @param topic the topic for which to get output fields + * @return the declared output fields + */ + public Fields getOutputFields(String topic) { + if (topicToStream.containsKey(topic)) { + final Fields outputFields = topicToStream.get(topic).getOutputFields(); + LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields); + return outputFields; + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @param topic the topic to for which to get the stream id + * @return the id of the stream to where the tuples are emitted + */ + public String getStreamId(String topic) { + if (topicToStream.containsKey(topic)) { + final String streamId = topicToStream.get(topic).getStreamId(); + LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId); + return streamId; + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + */ + public List getTopics() { + return new ArrayList<>(topicToStream.keySet()); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (KafkaSpoutStream stream : topicToStream.values()) { + if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { + declarer.declareStream(stream.getStreamId(), stream.getOutputFields()); + LOG.debug("Declared " + stream); + } + } + } + + public void emit(SpoutOutputCollector collector, List tuple, KafkaSpoutMessageId messageId) { + collector.emit(getStreamId(messageId.topic()), tuple, messageId); + } + + @Override + public String toString() { + return "KafkaSpoutStreams{" + + "topicToStream=" + topicToStream + + '}'; + } + + public static class Builder { + private final Map topicToStream = new HashMap<>();; + + /** + * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified. + * All topics will have the same stream id and output fields. + */ + public Builder(Fields outputFields, String... topics) { + addStream(outputFields, topics); + } + + /** + * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. + * All the topics will have the same stream id and output fields. + */ + public Builder (Fields outputFields, String streamId, String... topics) { + addStream(outputFields, streamId, topics); + } + + /** + * Adds this stream to the state representing the streams associated with each topic + */ + public Builder(KafkaSpoutStream stream) { + addStream(stream); + } + + /** + * Adds this stream to the state representing the streams associated with each topic + */ + public Builder addStream(KafkaSpoutStream stream) { + topicToStream.put(stream.getTopic(), stream); + return this; + } + + /** + * Please refer to javadoc in {@link #Builder(Fields, String...)} + */ + public Builder addStream(Fields outputFields, String... topics) { + addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics); + return this; + } + + /** + * Please refer to javadoc in {@link #Builder(Fields, String, String...)} + */ + public Builder addStream(Fields outputFields, String streamId, String... topics) { + for (String topic : topics) { + topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); + } + return this; + } + + public KafkaSpoutStreams build() { + return new KafkaSpoutStreams(this); + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java new file mode 100644 index 00000000000..3bb71a859c5 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s. + * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder} + */ +public abstract class KafkaSpoutTupleBuilder implements Serializable { + private List topics; + + /** + * @param topics list of topics that use this implementation to build tuples + */ + public KafkaSpoutTupleBuilder(String... topics) { + if (topics == null || topics.length == 0) { + throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty"); + } + this.topics = Arrays.asList(topics); + } + + /** + * @return list of topics that use this implementation to build tuples + */ + public List getTopics() { + return Collections.unmodifiableList(topics); + } + + /** + * Builds a list of tuples using the ConsumerRecord specified as parameter + * @param consumerRecord whose contents are used to build tuples + * @return list of tuples + */ + public abstract List buildTuple(ConsumerRecord consumerRecord); +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java new file mode 100644 index 00000000000..d67c69d5b9b --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s. + * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances + */ +public class KafkaSpoutTuplesBuilder implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class); + + private Map> topicToTupleBuilders; + + private KafkaSpoutTuplesBuilder(Builder builder) { + this.topicToTupleBuilders = builder.topicToTupleBuilders; + LOG.debug("Instantiated {}", this); + } + + public static class Builder { + private List> tupleBuilders; + private Map> topicToTupleBuilders; + + @SafeVarargs + public Builder(KafkaSpoutTupleBuilder... tupleBuilders) { + if (tupleBuilders == null || tupleBuilders.length == 0) { + throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); + } + + this.tupleBuilders = Arrays.asList(tupleBuilders); + topicToTupleBuilders = new HashMap<>(); + } + + public KafkaSpoutTuplesBuilder build() { + for (KafkaSpoutTupleBuilder tupleBuilder : tupleBuilders) { + for (String topic : tupleBuilder.getTopics()) { + if (!topicToTupleBuilders.containsKey(topic)) { + topicToTupleBuilders.put(topic, tupleBuilder); + } + } + } + return new KafkaSpoutTuplesBuilder<>(this); + } + } + + public ListbuildTuple(ConsumerRecord consumerRecord) { + final String topic = consumerRecord.topic(); + return topicToTupleBuilders.get(topic).buildTuple(consumerRecord); + } + + @Override + public String toString() { + return "KafkaSpoutTuplesBuilder{" + + "topicToTupleBuilders=" + topicToTupleBuilders + + '}'; + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java new file mode 100644 index 00000000000..7a94a50b0f5 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class KafkaSpoutTestBolt extends BaseRichBolt { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + LOG.debug("input = [" + input + "]"); + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java new file mode 100644 index 00000000000..0691dd30591 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.KafkaSpoutStreams; +import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +public class KafkaSpoutTopologyMain { + private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; + private static final String[] TOPICS = new String[]{"test","test1","test2"}; + + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); + } else { + submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); + } + } + + protected static void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, topology); + stopWaitingForInput(); + } + + protected static void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { + StormSubmitter.submitTopology(arg, config, topology); + } + + private static void stopWaitingForInput() { + try { + System.out.println("PRESS ENTER TO STOP"); + new BufferedReader(new InputStreamReader(System.in)).readLine(); + System.exit(0); + } catch (IOException e) { + e.printStackTrace(); + } + } + + protected static Config getConfig() { + Config config = new Config(); + config.setDebug(true); + return config; + } + + public static StormTopology getTopolgyKafkaSpout() { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); + tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); + return tp.createTopology(); + } + + public static KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { + return new KafkaSpoutConfig.Builder(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); + } + + private static KafkaSpoutRetryService getRetryService() { + return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS), + TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); + } + + private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) { + return new TimeInterval(delay, timeUnit); + } + + public static Map getKafkaConsumerProps() { + Map props = new HashMap<>(); +// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); + props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); + props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); + props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + public static KafkaSpoutTuplesBuilder getTuplesBuilder() { + return new KafkaSpoutTuplesBuilder.Builder<>( + new TopicsTest0Test1TupleBuilder(TOPICS[0], TOPICS[1]), + new TopicTest2TupleBuilder(TOPICS[2])) + .build(); + } + + public static KafkaSpoutStreams getKafkaSpoutStreams() { + final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); + final Fields outputFields1 = new Fields("topic", "partition", "offset"); + return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream + .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream + .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream + .build(); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java new file mode 100644 index 00000000000..ca65177951b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.test; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.tuple.Values; + +import java.util.List; + +public class TopicTest2TupleBuilder extends KafkaSpoutTupleBuilder { + /** + * @param topics list of topics that use this implementation to build tuples + */ + public TopicTest2TupleBuilder(String... topics) { + super(topics); + } + + @Override + public List buildTuple(ConsumerRecord consumerRecord) { + return new Values(consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset()); + } +} \ No newline at end of file diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java new file mode 100644 index 00000000000..4c55aa19c9e --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.test; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.tuple.Values; + +import java.util.List; + +public class TopicsTest0Test1TupleBuilder extends KafkaSpoutTupleBuilder { + /** + * @param topics list of topics that use this implementation to build tuples + */ + public TopicsTest0Test1TupleBuilder(String... topics) { + super(topics); + } + + @Override + public List buildTuple(ConsumerRecord consumerRecord) { + return new Values(consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.key(), + consumerRecord.value()); + } +} \ No newline at end of file diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 43b7796c597..763c15f01c6 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -118,26 +118,12 @@ org.apache.kafka - kafka_2.10 - 0.8.2.1 - + ${kafka.artifact.id} provided - - - org.apache.zookeeper - zookeeper - - - log4j - log4j - - org.apache.kafka kafka-clients - 0.8.2.1 - provided org.apache.storm diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml index d093ae8437f..ba79ddc686c 100644 --- a/external/storm-solr/pom.xml +++ b/external/storm-solr/pom.xml @@ -31,10 +31,10 @@ - Hugo-Louro - Hugo Louro - hmclouro@gmail.com - + hmcl + Hugo Louro + hmclouro@gmail.com + diff --git a/pom.xml b/pom.xml index 12e5a9f50fd..1a899b36946 100644 --- a/pom.xml +++ b/pom.xml @@ -82,7 +82,7 @@ Committer - + afeng @@ -109,7 +109,7 @@ Committer - + jjackson @@ -249,6 +249,8 @@ 1.4.0-incubating 2.6.3 2.18.1 + 0.9.0.1 + kafka_2.11 org.apache.storm.testing.IntegrationTest @@ -282,6 +284,7 @@ external/storm-mongodb examples/storm-starter storm-clojure + external/storm-kafka-client @@ -673,14 +676,14 @@ ${ring-json.version} - org.eclipse.jetty - jetty-servlet - ${jetty.version} + org.eclipse.jetty + jetty-servlet + ${jetty.version} - org.eclipse.jetty - jetty-servlets - ${jetty.version} + org.eclipse.jetty + jetty-servlets + ${jetty.version} org.eclipse.jetty @@ -831,7 +834,7 @@ ${thrift.version} compile - + junit junit @@ -839,14 +842,38 @@ test - org.apache.calcite - calcite-core - ${calcite.version} + org.apache.calcite + calcite-core + ${calcite.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + org.apache.kafka + ${kafka.artifact.id} + ${kafka.version} + + + org.apache.zookeeper + zookeeper + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} + org.apache.kafka + kafka-clients + ${kafka.version} uk.org.lidalia diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index 7f0da6fc62e..648640ea691 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -317,6 +317,20 @@ README.* + + ${project.basedir}/../../external/storm-kafka-client/target + external/storm-kafka-client + + storm*jar + + + + ${project.basedir}/../../external/storm-kafka-client + external/storm-kafka-client + + README.* + +