diff --git a/examples/pom.xml b/examples/pom.xml
index f5ab2a7fdc09..6852ba93cc1d 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -102,6 +102,11 @@
spark-streaming-kafka_${scala.binary.version}
${project.version}
+
+ org.apache.spark
+ spark-streaming-kafka-v09_${scala.binary.version}
+ ${project.version}
+
org.apache.hbase
hbase-testing-util
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/v09DirectKafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/v09DirectKafkaWordCount.scala
new file mode 100644
index 000000000000..7dca188dd7a1
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/v09DirectKafkaWordCount.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka.v09._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: v09DirectKafkaWordCount
+ * is a list of one or more Kafka brokers
+ * is a list of one or more kafka topics to consume from
+ * is the name of kafka consumer group
+ * What to do when there is no initial offset in Kafka or
+ * if the current offset does not exist any more on the server
+ * earliest: automatically reset the offset to the earliest offset
+ * latest: automatically reset the offset to the latest offset
+ * is the time interval at which streaming data will be divided into batches
+ * is time, in milliseconds, spent waiting in Kafka consumer poll
+ * if data is not available
+ * Example:
+ * $ bin/run-example streaming.v09DirectKafkaWordCount broker1-host:port,broker2-host:port \
+ * topic1,topic2 my-consumer-group latest batch-interval pollTimeout
+ */
+object v09DirectKafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println(s"""
+ |Usage: v09DirectKafkaWordCount
+ | is a list of one or more Kafka brokers
+ | is a list of one or more kafka topics to consume from
+ | is the name of kafka consumer group
+ | What to do when there is no initial offset
+ | in Kafka or if the current offset does not exist
+ | any more on the server
+ | earliest: automatically reset the offset
+ | to the earliest offset
+ | latest: automatically reset the offset
+ | to the latest offset
+ | is the time interval at which
+ | streaming data will be divided into batches
+ | is time, in milliseconds, spent waiting in
+ | Kafka consumer poll if data is not available
+ |
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(brokers, topics, groupId, offsetReset, batchInterval, pollTimeout) = args
+
+ // Create context with 2 second batch interval
+ val sparkConf = new SparkConf().setAppName("v09DirectKafkaWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
+
+ // Create direct kafka stream with brokers and topics
+ val topicsSet = topics.split(",").toSet
+ val kafkaParams = Map[String, String](
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
+ ConsumerConfig.GROUP_ID_CONFIG -> groupId,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
+ "spark.kafka.poll.time" -> pollTimeout)
+ val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet)
+
+ // Get the lines, split them into words, count the words and print
+ val lines = messages.map(_._2)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
+ wordCounts.print()
+
+ // Start the computation
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index a9ed39ef8c9a..65f65b91182b 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -41,6 +41,11 @@
spark-streaming-kafka_${scala.binary.version}
${project.version}
+
+ org.apache.spark
+ spark-streaming-kafka-v09_${scala.binary.version}
+ ${project.version}
+
org.apache.spark
spark-streaming_${scala.binary.version}
diff --git a/external/kafka-v09/pom.xml b/external/kafka-v09/pom.xml
new file mode 100644
index 000000000000..8392b12459af
--- /dev/null
+++ b/external/kafka-v09/pom.xml
@@ -0,0 +1,144 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.spark
+ spark-parent_2.10
+ 1.6.0-SNAPSHOT
+ ../../pom.xml
+
+
+ org.apache.spark
+ spark-streaming-kafka-v09_2.10
+
+ streaming-kafka-v09
+
+ jar
+ Spark Project External Kafka v09
+ http://spark.apache.org/
+
+
+
+ com.101tec
+ zkclient
+ 0.6
+
+
+ com.yammer.metrics
+ metrics-core
+ 2.2.0
+
+
+
+ org.eclipse.jetty
+ jetty-util
+
+
+
+ org.apache.spark
+ spark-streaming_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.kafka
+ kafka_${scala.binary.version}
+ 0.9.0.0
+
+
+ com.sun.jmx
+ jmxri
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ net.sf.jopt-simple
+ jopt-simple
+
+
+ org.slf4j
+ slf4j-simple
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.9.0.0
+
+
+ com.sun.jmx
+ jmxri
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ net.sf.jopt-simple
+ jopt-simple
+
+
+ org.slf4j
+ slf4j-simple
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ net.sf.jopt-simple
+ jopt-simple
+ 3.2
+ test
+
+
+ org.scalacheck
+ scalacheck_${scala.binary.version}
+ test
+
+
+ org.apache.spark
+ spark-test-tags_${scala.binary.version}
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/Broker.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/Broker.scala
new file mode 100644
index 000000000000..0826cd027e50
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/Broker.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
+ */
+final class Broker private(
+ /** Broker's hostname */
+ val host: String,
+ /** Broker's port */
+ val port: Int) extends Serializable {
+ override def equals(obj: Any): Boolean = obj match {
+ case that: Broker =>
+ this.host == that.host &&
+ this.port == that.port
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ 41 * (41 + host.hashCode) + port
+ }
+
+ override def toString(): String = {
+ s"Broker($host, $port)"
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object that provides methods to create instances of [[Broker]].
+ */
+@Experimental
+object Broker {
+ def create(host: String, port: Int): Broker =
+ new Broker(host, port)
+
+ def apply(host: String, port: Int): Broker =
+ new Broker(host, port)
+
+ def unapply(broker: Broker): Option[(String, Int)] = {
+ if (broker == null) {
+ None
+ } else {
+ Some((broker.host, broker.port))
+ }
+ }
+}
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/DirectKafkaInputDStream.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/DirectKafkaInputDStream.scala
new file mode 100644
index 000000000000..448baf867795
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/DirectKafkaInputDStream.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark.Logging
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.kafka.v09.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.{StreamingContext, Time}
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+/**
+ * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ * of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * configuration parameters.
+ * Requires "metadata.broker.list" or "bootstrap.servers" to be set
+ * with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ R: ClassTag](
+ @transient ssc_ : StreamingContext,
+ val kafkaParams: Map[String, String],
+ @transient val fromOffsets: Map[TopicPartition, Long],
+ messageHandler: ConsumerRecord[K, V] => R
+ ) extends InputDStream[R](ssc_) with Logging {
+
+ val maxRetries = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRetries", 1)
+
+ // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+ private[streaming] override def name: String = s"Kafka 0.9 direct stream [$id]"
+
+ protected[streaming] override val checkpointData =
+ new DirectKafkaInputDStreamCheckpointData
+
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
+ protected var kafkaCluster = new KafkaCluster[K, V](kafkaParams)
+
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+ "spark.streaming.kafka.maxRatePerPartition", 0)
+
+ protected def maxMessagesPerPartition: Option[Long] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+ val numPartitions = currentOffsets.keys.size
+
+ val effectiveRateLimitPerPartition = estimatedRateLimit
+ .filter(_ > 0)
+ .map { limit =>
+ if (maxRateLimitPerPartition > 0) {
+ Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+ } else {
+ limit / numPartitions
+ }
+ }.getOrElse(maxRateLimitPerPartition)
+
+ if (effectiveRateLimitPerPartition > 0) {
+ val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+ Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
+ } else {
+ None
+ }
+ }
+
+ // temp fix for serialization issue of TopicPartition
+ protected var serCurrentOffsets = fromOffsets.map { case(tp, l) =>
+ (tp.topic, tp.partition, l);
+ }
+
+ @transient
+ protected var currentOffsets: Map[TopicPartition, Long] = null
+
+ protected final def latestLeaderOffsets(): Map[TopicPartition, LeaderOffset] = {
+ kafkaCluster.getLatestOffsetsWithLeaders(currentOffsets.keySet)
+ }
+
+ // limits the maximum number of messages per partition
+ protected def clamp(
+ leaderOffsets: Map[TopicPartition, LeaderOffset]
+ ): Map[TopicPartition, LeaderOffset] = {
+ maxMessagesPerPartition.map { mmp =>
+ leaderOffsets.map { case (tp, lo) =>
+ tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
+ }
+ }.getOrElse(leaderOffsets)
+ }
+
+ override def compute(validTime: Time): Option[KafkaRDD[K, V, R]] = {
+ currentOffsets = serCurrentOffsets.map { i => new TopicPartition(i._1, i._2) -> i._3 }.toMap
+ val untilOffsets = clamp(latestLeaderOffsets())
+ val rdd = KafkaRDD[K, V, R](
+ context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
+
+ // Report the record number and metadata of this batch interval to InputInfoTracker.
+ val offsetRanges = currentOffsets.map { case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+ }
+ val description = offsetRanges.filter { offsetRange =>
+ // Don't display empty ranges.
+ offsetRange.fromOffset != offsetRange.untilOffset
+ }.map { offsetRange =>
+ s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ }.mkString("\n")
+ // Copy offsetRanges to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "offsets" -> offsetRanges.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+ val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+ serCurrentOffsets = untilOffsets.map { kv => (kv._1.topic, kv._1.partition, kv._2.offset) }
+ Some(rdd)
+ }
+
+ override def start(): Unit = {
+ }
+
+ def stop(): Unit = {
+ if (kafkaCluster != null) {
+ kafkaCluster.close()
+ kafkaCluster = null
+ }
+ }
+
+ private[streaming]
+ class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
+
+ override def update(time: Time) {
+ batchForTime.clear()
+ generatedRDDs.foreach { kv =>
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V, R]].offsetRanges.map(_.toTuple).toArray
+ batchForTime += kv._1 -> a
+ }
+ }
+
+ override def cleanup(time: Time) {}
+
+ override def restore() {
+ // this is assuming that the topics don't change during execution, which is true currently
+
+ batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V, R](
+ context.sparkContext, kafkaParams, b.map(OffsetRange(_)), messageHandler)
+ }
+ }
+ }
+
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
+
+}
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala
new file mode 100644
index 000000000000..c4ff717329d6
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import java.util
+import java.util.{Collections}
+
+import org.apache.kafka.clients.consumer.{OffsetResetStrategy, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.apache.spark.SparkException
+
+import scala.collection.JavaConverters._
+import scala.reflect._
+
+/**
+ * @param kafkaParams Kafka
+ * configuration parameters.
+ * Requires "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster[K: ClassTag, V: ClassTag](val kafkaParams: Map[String, String])
+ extends Serializable {
+
+ import KafkaCluster.LeaderOffset
+
+ @transient
+ protected var consumer: KafkaConsumer[K, V] = null
+
+ def getLatestOffsets(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
+ getOffsetsWithoutLeaders(topicPartitions, OffsetResetStrategy.LATEST)
+ }
+
+ def getEarliestOffsets(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
+ getOffsetsWithoutLeaders(topicPartitions, OffsetResetStrategy.EARLIEST)
+ }
+
+ def getPartitions(topics: Set[String]): Set[TopicPartition] = {
+ withConsumer { consumer => {
+ val partInfo = topics.flatMap {
+ topic => Option(consumer.partitionsFor(topic)) match {
+ case None => throw new SparkException("Topic doesn't exist " + topic)
+ case Some(partInfoList) => partInfoList.asScala.toList
+ }
+ }
+ val topicPartitions: Set[TopicPartition] = partInfo.map { partition =>
+ new TopicPartition(partition.topic(), partition.partition())
+ }
+ topicPartitions
+ }
+ }.asInstanceOf[Set[TopicPartition]]
+ }
+
+ def getPartitionsLeader(topics: Set[String]): Map[TopicPartition, String] = {
+ getPartitionInfo(topics).map { pi =>
+ new TopicPartition(pi.topic, pi.partition) -> pi.leader.host
+ }.toMap
+ }
+
+ def getPartitionInfo(topics: Set[String]): Set[PartitionInfo] = {
+ withConsumer { consumer =>
+ topics.flatMap { topic =>
+ Option(consumer.partitionsFor(topic)) match {
+ case None =>
+ throw new SparkException("Topic doesn't exist " + topic)
+ case Some(piList) => piList.asScala.toList
+ }
+ }
+ }.asInstanceOf[Set[PartitionInfo]]
+ }
+
+ def setConsumerOffsets(offsets: Map[TopicPartition, Long]): Unit = {
+ val topicPartOffsets = new util.HashMap[TopicPartition, OffsetAndMetadata]()
+ val topicPartition = offsets.map(tpl => tpl._1).toSeq
+
+ withConsumer(consumer => {
+ consumer.assign(Collections.emptyList[TopicPartition])
+ consumer.assign(topicPartition.asJava)
+
+ for ((topicAndPart, offset) <- offsets) {
+ val topicPartition = topicAndPart
+ val offsetAndMetadata = new OffsetAndMetadata(offset)
+ topicPartOffsets.put(topicPartition, offsetAndMetadata)
+ }
+
+ consumer.commitSync(topicPartOffsets)
+ })
+ }
+
+ def getCommittedOffsets(topicPartitions: Set[TopicPartition]):
+ Map[TopicPartition, Long] = {
+ withConsumer(consumer => {
+ consumer.assign(topicPartitions.toList.asJava)
+ topicPartitions.map( tp => {
+ val offsetAndMetadata = consumer.committed(tp)
+ Option(offsetAndMetadata) match {
+ case None => throw new SparkException(s"Topic $tp hasn't committed offsets")
+ case Some(om) => tp -> om.offset()
+ }
+ }
+ ).toMap
+ }).asInstanceOf[Map[TopicPartition, Long]]
+ }
+
+ def getLatestOffsetsWithLeaders(
+ topicPartitions: Set[TopicPartition]
+ ): Map[TopicPartition, LeaderOffset] = {
+ getOffsets(topicPartitions, OffsetResetStrategy.LATEST)
+ }
+
+ private def getOffsetsWithoutLeaders(
+ topicPartitions: Set[TopicPartition],
+ offsetResetType: OffsetResetStrategy
+ ): Map[TopicPartition, Long] = {
+ getOffsets(topicPartitions, offsetResetType)
+ .map { t => (t._1, t._2.offset) }
+ }
+
+ def getOffsets(topicPartitions: Set[TopicPartition], resetStrategy: OffsetResetStrategy):
+ Map[TopicPartition, LeaderOffset] = {
+ val topics = topicPartitions.map { _.topic }
+ withConsumer{ consumer =>
+ val tplMap = topics.flatMap { topic =>
+ Option(consumer.partitionsFor(topic)) match {
+ case None =>
+ throw new SparkException("Topic doesnt exist " + topic)
+ case Some(piList) => piList.asScala.toList
+ }
+ }.map { pi =>
+ new TopicPartition(pi.topic, pi.partition) -> pi.leader.host
+ }.toMap
+
+ consumer.assign(topicPartitions.toList.asJava)
+ resetStrategy match {
+ case OffsetResetStrategy.EARLIEST => consumer.seekToBeginning(topicPartitions.toList: _*)
+ case OffsetResetStrategy.LATEST => consumer.seekToEnd(topicPartitions.toList: _*)
+ case _ => throw new SparkException("Unknown OffsetResetStrategy " + resetStrategy)
+ }
+ topicPartitions.map { tp =>
+ val pos = consumer.position(tp)
+ tp -> new LeaderOffset(tplMap(tp), pos)
+ }.toMap
+
+ }.asInstanceOf[Map[TopicPartition, LeaderOffset]]
+ }
+
+ private def withConsumer(fn: KafkaConsumer[K, V] => Any): Any = {
+ if (consumer == null) {
+ consumer = new KafkaConsumer[K, V](kafkaParams.asInstanceOf[Map[String, Object]].asJava)
+ }
+ fn(consumer)
+ }
+
+ def close(): Unit = {
+ if (consumer != null) {
+ consumer.close()
+ consumer = null
+ }
+ }
+
+}
+
+private[spark]
+object KafkaCluster {
+
+ private[spark]
+ case class LeaderOffset(host: String, offset: Long)
+}
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaRDD.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaRDD.scala
new file mode 100644
index 000000000000..debbc9c3352e
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaRDD.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import java.util.{ Collections, Properties }
+
+import org.apache.kafka.clients.consumer.{ ConsumerRecord, KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark.partial.{ BoundedDouble, PartialResult }
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.v09.KafkaCluster.{LeaderOffset}
+import org.apache.spark.util.NextIterator
+import org.apache.spark.{ Logging, Partition, SparkContext, TaskContext }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+import scala.collection.JavaConverters._
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ */
+private[kafka]
+class KafkaRDD[K: ClassTag, V: ClassTag, R: ClassTag] private[spark] (
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ val offsetRanges: Array[OffsetRange],
+ messageHandler: ConsumerRecord[K, V] => R
+ ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+ private val KAFKA_DEFAULT_POLL_TIME: String = "0"
+ private val pollTime = kafkaParams.get("spark.kafka.poll.time")
+ .getOrElse(KAFKA_DEFAULT_POLL_TIME).toInt
+ private val cluster = new KafkaCluster[K, V](kafkaParams)
+
+ override def getPartitions: Array[Partition] = {
+ offsetRanges.zipWithIndex.map {
+ case (o, i) =>
+ new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, o.leaderHost)
+ }.toArray
+ }
+
+ override def count(): Long = offsetRanges.map(_.count).sum
+
+ override def countApprox(
+ timeout: Long,
+ confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+ val c = count
+ new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+ }
+
+ override def isEmpty(): Boolean = count == 0L
+
+ override def take(num: Int): Array[R] = {
+ val nonEmptyPartitions = this.partitions
+ .map(_.asInstanceOf[KafkaRDDPartition])
+ .filter(_.count > 0)
+
+ if (num < 1 || nonEmptyPartitions.size < 1) {
+ return new Array[R](0)
+ }
+
+ // Determine in advance how many messages need to be taken from each partition
+ val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+ val remain = num - result.values.sum
+ if (remain > 0) {
+ val taken = Math.min(remain, part.count)
+ result + (part.index -> taken.toInt)
+ } else {
+ result
+ }
+ }
+
+ val buf = new ArrayBuffer[R]
+ val res = context.runJob(
+ this,
+ (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
+ parts.keys.toArray)
+ res.foreach(buf ++= _)
+ buf.toArray
+ }
+
+ override def getPreferredLocations(thePart: Partition): Seq[String] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ // TODO is additional hostname resolution necessary here
+ if (part.host != null ) {
+ Seq(part.host)
+ }
+ else {
+ Seq()
+ }
+ }
+
+ private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+ s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition}. " +
+ "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+ private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+ s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates that messages may have been lost"
+
+ private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
+ s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
+ s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+ " This should not happen, and indicates a message may have been skipped"
+
+ override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
+ val part = thePart.asInstanceOf[KafkaRDDPartition]
+ assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ if (part.fromOffset == part.untilOffset) {
+ log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+ s"skipping ${part.topic} ${part.partition}")
+ Iterator.empty
+ } else {
+ new KafkaRDDIterator(part, context)
+ }
+ }
+
+ private class KafkaRDDIterator(
+ part: KafkaRDDPartition,
+ context: TaskContext) extends NextIterator[R] {
+
+ context.addTaskCompletionListener { context => closeIfNeeded() }
+
+ log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+
+ val consumer = new KafkaConsumer[K, V](props)
+ val tp = new TopicPartition(part.topic, part.partition)
+ consumer.assign(Collections.singletonList[TopicPartition](tp))
+
+ var requestOffset = part.fromOffset
+ var iter: Iterator[ConsumerRecord[K, V]] = null
+ consumer.seek(tp, requestOffset)
+
+ override def close(): Unit = {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
+
+ private def fetchBatch: Iterator[ConsumerRecord[K, V]] = {
+ consumer.seek(new TopicPartition(part.topic, part.partition), requestOffset)
+ val recs = consumer.poll(pollTime)
+ recs.records(new TopicPartition(part.topic, part.partition)).iterator().asScala
+ }
+
+ override def getNext(): R = {
+ if ( requestOffset == part.untilOffset ) {
+ finished = true
+ null.asInstanceOf[R]
+ }
+
+ if (iter == null || !iter.hasNext) {
+ iter = fetchBatch
+ }
+
+ if (!iter.hasNext) {
+ if ( requestOffset < part.untilOffset ) {
+ return getNext()
+ }
+ assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ val item: ConsumerRecord[K, V] = iter.next()
+ if (item.offset >= part.untilOffset) {
+ assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
+ finished = true
+ null.asInstanceOf[R]
+ } else {
+ requestOffset = item.offset() + 1
+ messageHandler(item)
+ }
+ }
+ }
+ }
+
+}
+
+private[kafka]
+object KafkaRDD {
+
+ /**
+ * @param kafkaParams Kafka
+ * configuration parameters.
+ * Requires "bootstrap.servers" to be set with Kafka broker(s),
+ * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the batch
+ * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
+ * ending point of the batch
+ */
+ def apply[K: ClassTag, V: ClassTag, R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicPartition, Long],
+ untilOffsets: Map[TopicPartition, LeaderOffset],
+ messageHandler: ConsumerRecord[K, V] => R): KafkaRDD[K, V, R] = {
+ val offsetRanges = fromOffsets.map {
+ case (tp, fo) =>
+ val uo = untilOffsets(tp)
+ OffsetRange(tp.topic, tp.partition, fo, uo.offset, uo.host)
+ }.toArray
+
+ new KafkaRDD[K, V, R](sc, kafkaParams, offsetRanges, messageHandler)
+ }
+}
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaRDDPartition.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaRDDPartition.scala
new file mode 100644
index 000000000000..b3889aae9063
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaRDDPartition.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import org.apache.spark.Partition
+
+/** @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ */
+private[kafka]
+class KafkaRDDPartition(
+ val index: Int,
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long,
+ val host: String
+ ) extends Partition {
+ /** Number of messages this partition refers to */
+ def count(): Long = untilOffset - fromOffset
+}
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaTestUtils.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaTestUtils.scala
new file mode 100644
index 000000000000..c5baaeca9e18
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaTestUtils.scala
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.concurrent.TimeoutException
+import java.util.{Map => JMap, Properties}
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.ZkUtils
+import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka] class KafkaTestUtils extends Logging {
+
+ // Zookeeper related configurations
+ private val zkHost = "localhost"
+ private var zkPort: Int = 0
+ private val zkConnectionTimeout = 60000
+ private val zkSessionTimeout = 6000
+
+ private var zookeeper: EmbeddedZookeeper = _
+
+ private var zkClient: ZkClient = _
+ private var zkUtils: ZkUtils = _
+
+ // Kafka broker related configurations
+ private val brokerHost = "localhost"
+ private var brokerPort = 9092
+ private var brokerConf: KafkaConfig = _
+
+ // Kafka broker server
+ private var server: KafkaServer = _
+
+ // Kafka producer
+ private var producer: Producer[String, String] = _
+
+ // Flag to test whether the system is correctly started
+ private var zkReady = false
+ private var brokerReady = false
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
+
+ def zookeeperClient: ZkClient = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+ Option(zkClient).getOrElse(
+ throw new IllegalStateException("Zookeeper client is not yet initialized"))
+ }
+
+ // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+ private def setupEmbeddedZookeeper(): Unit = {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
+ zkClient = ZkUtils.createZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout)
+ zkUtils = ZkUtils(zkClient, JaasUtils.isZkSecurityEnabled())
+ zkReady = true
+ }
+
+ // Set up the Embedded Kafka server
+ private def setupEmbeddedKafkaServer(): Unit = {
+ assert(zkReady, "Zookeeper should be set up beforehand")
+
+ // Kafka broker startup
+ Utils.startServiceOnPort(brokerPort, port => {
+ brokerPort = port
+ brokerConf = new KafkaConfig(brokerConfiguration)
+ server = new KafkaServer(brokerConf)
+ server.startup()
+ (server, port)
+ }, new SparkConf(), "KafkaBroker")
+
+ brokerReady = true
+ }
+
+ /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+ def setup(): Unit = {
+ setupEmbeddedZookeeper()
+ setupEmbeddedKafkaServer()
+ }
+
+ /** Teardown the whole servers, including Kafka broker and Zookeeper */
+ def teardown(): Unit = {
+ brokerReady = false
+ zkReady = false
+
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
+
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+
+ if (zkUtils != null) {
+ zkUtils.close()
+ zkUtils = null
+ }
+
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String): Unit = {
+ AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ // wait until metadata is propagated
+ waitUntilMetadataIsPropagated(topic, 0)
+ }
+
+ /** Java-friendly function for sending messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /** Send the messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ /** Send the array of messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[String]): Unit = {
+ producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
+ producer.send(messages.map {
+ new KeyedMessage[String, String](topic, _)
+ }: _*)
+ producer.close()
+ producer = null
+ }
+
+ private def brokerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.put("host.name", "localhost")
+ props.put("port", brokerPort.toString)
+ props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkAddress)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props
+ }
+
+ private def producerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("metadata.broker.list", brokerAddress)
+ props.put("serializer.class", classOf[StringEncoder].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("request.required.acks", "-1")
+ props
+ }
+
+ // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+ // dependency
+ def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+ def makeAttempt(): Either[Throwable, T] = {
+ try {
+ Right(func)
+ } catch {
+ case e if NonFatal(e) => Left(e)
+ }
+ }
+
+ val startTime = System.currentTimeMillis()
+ @tailrec
+ def tryAgain(attempt: Int): T = {
+ makeAttempt() match {
+ case Right(result) => result
+ case Left(e) =>
+ val duration = System.currentTimeMillis() - startTime
+ if (duration < timeout.milliseconds) {
+ Thread.sleep(interval.milliseconds)
+ } else {
+ throw new TimeoutException(e.getMessage)
+ }
+
+ tryAgain(attempt + 1)
+ }
+ }
+
+ tryAgain(1)
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.size >= 1
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+ }
+ }
+
+ private class EmbeddedZookeeper(val zkConnect: String) {
+ val snapshotDir = Utils.createTempDir()
+ val logDir = Utils.createTempDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val (ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ val actualPort = factory.getLocalPort
+
+ def shutdown() {
+ factory.shutdown()
+ Utils.deleteRecursively(snapshotDir)
+ Utils.deleteRecursively(logDir)
+ }
+ }
+
+}
+
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaUtils.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaUtils.scala
new file mode 100644
index 000000000000..00bafa2495b8
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaUtils.scala
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import java.io.OutputStream
+import java.lang.{Integer => JInt, Long => JLong}
+
+import java.util.{List => JList}
+import java.util.{Map => JMap}
+import java.util.{Set => JSet}
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SslConfigs
+
+import scala.reflect.ClassTag
+
+import com.google.common.base.Charsets.UTF_8
+import kafka.common.TopicAndPartition
+import kafka.serializer.Decoder
+import net.razorvine.pickle.{Opcodes, Pickler, IObjectPickler}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.{SSLOptions, SparkContext, SparkException}
+
+import scala.collection.JavaConverters._
+import scala.reflect._
+import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
+import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java._
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+
+object KafkaUtils {
+
+ def addSSLOptions(
+ kafkaParams: Map[String, String],
+ sc: SparkContext
+ ): Map[String, String] = {
+
+ val sparkConf = sc.getConf
+ val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", None)
+ val kafkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.kafka", Some(defaultSSLOptions))
+
+ if (kafkaSSLOptions.enabled) {
+ val sslParams = Map[String, Option[_]](
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> Some("SSL"),
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> kafkaSSLOptions.trustStore,
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> kafkaSSLOptions.trustStorePassword,
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> kafkaSSLOptions.keyStore,
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> kafkaSSLOptions.keyStorePassword,
+ SslConfigs.SSL_KEY_PASSWORD_CONFIG -> kafkaSSLOptions.keyPassword
+ )
+ kafkaParams ++ sslParams.filter(_._2.isDefined).mapValues(_.get.toString)
+ } else {
+ kafkaParams
+ }
+
+ }
+
+ /** Make sure offsets are available in kafka, or throw an exception */
+ private def checkOffsets(
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]): Array[OffsetRange] = {
+ val kc = new KafkaCluster(kafkaParams)
+ try {
+ val topics = offsetRanges.map(_.topicPartition).toSet
+ val low = kc.getEarliestOffsets(topics)
+ val high = kc.getLatestOffsetsWithLeaders(topics)
+
+ val result = offsetRanges.filterNot { o =>
+ low(o.topicPartition()) <= o.fromOffset &&
+ o.untilOffset <= high(o.topicPartition()).offset
+ }
+
+ if (!result.isEmpty) {
+ throw new SparkException("Offsets not available in Kafka: " + result.mkString(","))
+ }
+
+ offsetRanges.map { o =>
+ OffsetRange(o.topic, o.partition, o.fromOffset, o.untilOffset,
+ high(o.topicPartition()).host)
+ }
+ }
+ finally {
+ kc.close()
+ }
+ }
+
+
+ def createRDD[K: ClassTag, V: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): RDD[(K, V)] = sc.withScope {
+ val messageHandler = (cr: ConsumerRecord[K, V]) => (cr.key, cr.value)
+ new KafkaRDD[K, V, (K, V)](
+ sc,
+ addSSLOptions(kafkaParams, sc),
+ checkOffsets(kafkaParams, offsetRanges),
+ messageHandler
+ )
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
+ * @param sc SparkContext object
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ def createRDD[K: ClassTag, V: ClassTag, R: ClassTag](
+ sc: SparkContext,
+ kafkaParams: Map[String, String],
+ offsetRanges: Array[OffsetRange],
+ messageHandler: ConsumerRecord[K, V] => R
+ ): RDD[R] = sc.withScope {
+ val kc = new KafkaCluster[K, V](addSSLOptions(kafkaParams, sc))
+ val cleanedHandler = sc.clean(messageHandler)
+ new KafkaRDD[K, V, R](sc,
+ addSSLOptions(kafkaParams, sc),
+ checkOffsets(kafkaParams, offsetRanges),
+ cleanedHandler
+ )
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition.
+ *
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * specified in host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ */
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange]
+ ): JavaPairRDD[K, V] = jsc.sc.withScope {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ new JavaPairRDD(createRDD[K, V](
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
+ }
+
+ /**
+ * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
+ * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
+ * as the metadata.
+ *
+ * @param jsc JavaSparkContext object
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * specified in host1:port1,host2:port2 form.
+ * @param offsetRanges Each OffsetRange in the batch corresponds to a
+ * range of offsets for a given Kafka topic/partition
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+ jsc: JavaSparkContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ offsetRanges: Array[OffsetRange],
+ messageHandler: JFunction[ConsumerRecord[K, V], R]
+ ): JavaRDD[R] = jsc.sc.withScope {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ createRDD[K, V, R](
+ jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, messageHandler.call _)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.v09.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+ * host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ def createDirectStream[K: ClassTag, V: ClassTag, R: ClassTag](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ fromOffsets: Map[TopicPartition, Long],
+ messageHandler: ConsumerRecord[K, V] => R
+ ): InputDStream[R] = {
+ val cleanedHandler = ssc.sc.clean(messageHandler)
+ new DirectKafkaInputDStream[K, V, R](
+ ssc, addSSLOptions(kafkaParams, ssc.sparkContext), fromOffsets, messageHandler)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.v09.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param ssc StreamingContext object
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set to
+ * "earliest" or "latest" to determine where the stream starts
+ * (defaults to "latest")
+ * @param topics Names of the topics to consume
+ */
+ def createDirectStream[K: ClassTag, V: ClassTag](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): InputDStream[(K, V)] = {
+ val messageHandler = (cr: ConsumerRecord[K, V]) => (cr.key, cr.value)
+ val fromOffsets = getFromOffsets(kafkaParams, topics)
+
+ new DirectKafkaInputDStream[K, V, (K, V)](
+ ssc, addSSLOptions(kafkaParams, ssc.sparkContext), fromOffsets, messageHandler)
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.v09.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param recordClass Class of the records in DStream
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * specified in host1:port1,host2:port2 form.
+ * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
+ * starting point of the stream
+ * @param messageHandler Function for translating each message and metadata into the desired type
+ */
+ def createDirectStream[K, V, R](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ recordClass: Class[R],
+ kafkaParams: JMap[String, String],
+ fromOffsets: JMap[TopicPartition, JLong],
+ messageHandler: JFunction[ConsumerRecord[K, V], R]
+ ): JavaInputDStream[R] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
+ createDirectStream[K, V, R](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(fromOffsets.asScala.mapValues {
+ _.longValue()
+ }.toSeq: _*),
+ cleanedHandler
+ )
+ }
+
+ /**
+ * Create an input stream that directly pulls messages from Kafka Brokers
+ * without using any receiver. This stream can guarantee that each message
+ * from Kafka is included in transformations exactly once (see points below).
+ *
+ * Points to note:
+ * - No receivers: This stream does not use any receiver. It directly queries Kafka
+ * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ * by the stream itself.
+ * You can access the offsets used in each batch from the generated RDDs (see
+ * [[org.apache.spark.streaming.kafka.v09.HasOffsetRanges]]).
+ * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
+ * in the [[StreamingContext]]. The information on consumed offset can be
+ * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
+ * - End-to-end semantics: This stream ensures that every records is effectively received and
+ * transformed exactly once, but gives no guarantees on whether the transformed data are
+ * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
+ * that the output operation is idempotent, or use transactions to output records atomically.
+ * See the programming guide for more details.
+ *
+ * @param jssc JavaStreamingContext object
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param kafkaParams Kafka
+ * configuration parameters. Requires "bootstrap.servers"
+ * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+ * host1:port1,host2:port2 form.
+ * If not starting from a checkpoint, "auto.offset.reset" may be set
+ * to "latest" or "earliest" to determine where the stream starts
+ * (defaults to "latest")
+ * @param topics Names of the topics to consume
+ */
+ def createDirectStream[K, V](
+ jssc: JavaStreamingContext,
+ keyClass: Class[K],
+ valueClass: Class[V],
+ kafkaParams: JMap[String, String],
+ topics: JSet[String]
+ ): JavaPairInputDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+ implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+ createDirectStream[K, V](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Set(topics.asScala.toSeq: _*)
+ )
+ }
+
+ def createOffsetRange(
+ topic: String,
+ partition: JInt,
+ fromOffset: JLong,
+ untilOffset: JLong
+ ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+ def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
+ TopicAndPartition(topic, partition)
+
+ private[kafka] def getFromOffsets(
+ kafkaParams: Map[String, String],
+ topics: Set[String]
+ ): Map[TopicPartition, Long] = {
+ val kc = new KafkaCluster(kafkaParams)
+ try {
+ val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+ if (reset == Some("earliest")) {
+ kc.getEarliestOffsets(kc.getPartitions(topics))
+ } else {
+ kc.getLatestOffsets(kc.getPartitions(topics))
+ }
+ }
+ finally {
+ kc.close()
+ }
+ }
+}
+
+/**
+ * This is a helper class that wraps the KafkaUtils.createStream() into more
+ * Python-friendly class and function so that it can be easily
+ * instantiated and called from Python's KafkaUtils (see SPARK-6027).
+ *
+ * The zero-arg constructor helps instantiate this class from the Class object
+ * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
+ * takes care of known parameters instead of passing them from Python
+ */
+private[kafka] class KafkaUtilsPythonHelper {
+ import KafkaUtilsPythonHelper._
+
+ def createRDDWithoutMessageHandler(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange]): JavaRDD[(Array[Byte], Array[Byte])] = {
+ val messageHandler =
+ (cr: ConsumerRecord[Array[Byte], Array[Byte]]) => (cr.key, cr.value)
+ new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, messageHandler))
+ }
+
+ def createRDDWithMessageHandler(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange]): JavaRDD[Array[Byte]] = {
+ val messageHandler = (cr: ConsumerRecord[Array[Byte], Array[Byte]]) =>
+ new PythonConsumerRecord(
+ cr.topic, cr.partition, cr.offset, cr.key(), cr.value())
+ val rdd = createRDD(jsc, kafkaParams, offsetRanges, messageHandler).
+ mapPartitions(picklerIterator)
+ new JavaRDD(rdd)
+ }
+
+ private def createRDD[V: ClassTag](
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ messageHandler: ConsumerRecord[Array[Byte], Array[Byte]] => V): RDD[V] = {
+ kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "ByteArrayDeserializer" )
+ kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "ByteArrayDeserializer")
+ KafkaUtils.createRDD[Array[Byte], Array[Byte], V](
+ jsc.sc,
+ kafkaParams.asScala.toMap,
+ offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+ messageHandler
+ )
+ }
+
+ def createDirectStreamWithoutMessageHandler(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
+ val messageHandler =
+ (cr: ConsumerRecord[Array[Byte], Array[Byte]]) => (cr.key, cr.value)
+ new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
+ }
+
+ def createDirectStreamWithMessageHandler(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicPartition, JLong]): JavaDStream[Array[Byte]] = {
+ val messageHandler = (cr: ConsumerRecord[Array[Byte], Array[Byte]]) =>
+ new PythonConsumerRecord(cr.topic, cr.partition, cr.offset, cr.key(), cr.value())
+ val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
+ mapPartitions(picklerIterator)
+ new JavaDStream(stream)
+ }
+
+ private def createDirectStream[V: ClassTag](
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicPartition, JLong],
+ messageHandler: ConsumerRecord[Array[Byte], Array[Byte]] => V): DStream[V] = {
+
+ val currentFromOffsets = if (!fromOffsets.isEmpty) {
+ val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
+ if (topicsFromOffsets != topics.asScala.toSet) {
+ throw new IllegalStateException(
+ s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
+ s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
+ }
+ Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
+ } else {
+ KafkaUtils.getFromOffsets(
+ Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
+ }
+
+ kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "ByteArrayDeserializer" )
+ kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "ByteArrayDeserializer")
+ KafkaUtils.createDirectStream[Array[Byte], Array[Byte], V](
+ jssc.ssc,
+ Map(kafkaParams.asScala.toSeq: _*),
+ Map(currentFromOffsets.toSeq: _*),
+ messageHandler)
+ }
+
+ def createOffsetRange(
+ topic: String,
+ partition: JInt,
+ fromOffset: JLong,
+ untilOffset: JLong
+ ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+ def createTopicAndPartition(topic: String, partition: JInt): TopicPartition =
+ new TopicPartition(topic, partition)
+
+ def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+ val parentRDDs = rdd.getNarrowAncestors
+ val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _]])
+
+ require(
+ kafkaRDDs.length == 1,
+ "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" +
+ "with this RDD, please call this method only on a Kafka RDD.")
+
+ val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _]]
+ kafkaRDD.offsetRanges.toSeq.asJava
+ }
+}
+
+private object KafkaUtilsPythonHelper {
+ private var initialized = false
+
+ def initialize(): Unit = {
+ SerDeUtil.initialize()
+ synchronized {
+ if (!initialized) {
+ new PythonConsumerRecordPickler().register()
+ initialized = true
+ }
+ }
+ }
+
+ initialize()
+
+ def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
+ new SerDeUtil.AutoBatchedPickler(iter)
+ }
+
+ case class PythonConsumerRecord(
+ topic: String,
+ partition: JInt,
+ offset: JLong,
+ key: Array[Byte],
+ message: Array[Byte])
+
+ class PythonConsumerRecordPickler extends IObjectPickler {
+ private val module = "pyspark.streaming.kafka"
+
+ def register(): Unit = {
+ Pickler.registerCustomPickler(classOf[PythonConsumerRecord], this)
+ Pickler.registerCustomPickler(this.getClass, this)
+ }
+
+ def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
+ if (obj == this) {
+ out.write(Opcodes.GLOBAL)
+ out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(UTF_8))
+ } else {
+ pickler.save(this)
+ val msgAndMetaData = obj.asInstanceOf[PythonConsumerRecord]
+ out.write(Opcodes.MARK)
+ pickler.save(msgAndMetaData.topic)
+ pickler.save(msgAndMetaData.partition)
+ pickler.save(msgAndMetaData.offset)
+ pickler.save(msgAndMetaData.key)
+ pickler.save(msgAndMetaData.message)
+ out.write(Opcodes.TUPLE)
+ out.write(Opcodes.REDUCE)
+ }
+ }
+ }
+}
diff --git a/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/OffsetRange.scala b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/OffsetRange.scala
new file mode 100644
index 000000000000..20672ac4745c
--- /dev/null
+++ b/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/OffsetRange.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ * ...
+ * }
+ * }}}
+ */
+trait HasOffsetRanges {
+ def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+ val topic: String,
+ val partition: Int,
+ val fromOffset: Long,
+ val untilOffset: Long,
+ val leaderHost: String) extends Serializable {
+ import OffsetRange.OffsetRangeTuple
+
+ def this(
+ topic: String,
+ partition: Int,
+ fromOffset: Long,
+ untilOffset: Long
+ ) = {
+ this(topic, partition, fromOffset, untilOffset, null)
+ }
+
+ /** Kafka TopicAndPartition object, for convenience */
+ def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition)
+
+ def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+ /** Number of messages this OffsetRange refers to */
+ def count(): Long = untilOffset - fromOffset
+
+ override def equals(obj: Any): Boolean = obj match {
+ case that: OffsetRange =>
+ this.topic == that.topic &&
+ this.partition == that.partition &&
+ this.fromOffset == that.fromOffset &&
+ this.untilOffset == that.untilOffset
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ toTuple.hashCode()
+ }
+
+ override def toString(): String = {
+ s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
+ }
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[streaming]
+ def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+object OffsetRange {
+ def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+ def create(
+ topicAndPartition: TopicAndPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+ def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+
+ def apply(
+ topic: String,
+ partition: Int,
+ fromOffset: Long,
+ untilOffset: Long,
+ leaderHost: String): OffsetRange =
+ new OffsetRange(topic, partition, fromOffset, untilOffset, leaderHost)
+
+ def apply(
+ topicPartition: TopicPartition,
+ fromOffset: Long,
+ untilOffset: Long): OffsetRange =
+ new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
+
+ /** this is to avoid ClassNotFoundException during checkpoint restore */
+ private[kafka]
+ type OffsetRangeTuple = (String, Int, Long, Long)
+
+ private[kafka]
+ def apply(t: OffsetRangeTuple) =
+ new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka-v09/src/test/java/org/apache/spark/streaming/kafka/v09/JavaDirectKafkaStreamSuite.java b/external/kafka-v09/src/test/java/org/apache/spark/streaming/kafka/v09/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 000000000000..71e0e9203d64
--- /dev/null
+++ b/external/kafka-v09/src/test/java/org/apache/spark/streaming/kafka/v09/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+ }
+
+ @After
+ public void tearDown() {
+ if (ssc != null) {
+ ssc.stop();
+ ssc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaStream() throws InterruptedException {
+ final String topic1 = "topic1_testKafkaDirectStream";
+ final String topic2 = "topic2_testKafkaDirectStream";
+ // hold a reference to the current offset ranges, so it can be used downstream
+ final AtomicReference offsetRanges = new AtomicReference<>();
+
+ String[] topic1data = createTopicAndSendData(topic1);
+ String[] topic2data = createTopicAndSendData(topic2);
+
+ Set sent = new HashSet<>();
+ sent.addAll(Arrays.asList(topic1data));
+ sent.addAll(Arrays.asList(topic2data));
+
+ Map kafkaParams = new HashMap<>();
+ kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestUtils.brokerAddress());
+ kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaParams.put("spark.kafka.poll.time", "1000");
+
+ JavaDStream stream1 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ kafkaParams,
+ topicToSet(topic1)
+ ).transformToPair(
+ // Make sure you can get offset ranges from the rdd
+ new Function, JavaPairRDD>() {
+ @Override
+ public JavaPairRDD call(JavaPairRDD rdd) {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ Assert.assertEquals(topic1, offsets[0].topic());
+ return rdd;
+ }
+ }
+ ).map(
+ new Function, String>() {
+ @Override
+ public String call(Tuple2 kv) {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaDStream stream2 = KafkaUtils.createDirectStream(
+ ssc,
+ String.class,
+ String.class,
+ String.class,
+ kafkaParams,
+ topicOffsetToMap(topic2, 0L),
+ new Function, String>() {
+ @Override
+ public String call(ConsumerRecord consumerRecord) throws Exception {
+ return consumerRecord.value();
+ }
+ }
+ );
+ JavaDStream unifiedStream = stream1.union(stream2);
+
+ final Set result = Collections.synchronizedSet(new HashSet());
+ unifiedStream.foreachRDD(
+ new Function, Void>() {
+ @Override
+ public Void call(JavaRDD rdd) {
+ result.addAll(rdd.collect());
+ for (OffsetRange o : offsetRanges.get()) {
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+ );
+ }
+ return null;
+ }
+ }
+ );
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ boolean matches = false;
+ while (!matches && System.currentTimeMillis() - startTime < 20000) {
+ matches = sent.size() == result.size();
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(sent, result);
+ ssc.stop();
+ }
+
+ private static Set topicToSet(String topic) {
+ Set topicSet = new HashSet<>();
+ topicSet.add(topic);
+ return topicSet;
+ }
+
+ private static Map topicOffsetToMap(String topic, Long offsetToStart) {
+ Map topicMap = new HashMap<>();
+ topicMap.put(new TopicPartition(topic, 0), offsetToStart);
+ return topicMap;
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = {topic + "-1", topic + "-2", topic + "-3"};
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka-v09/src/test/java/org/apache/spark/streaming/kafka/v09/JavaKafkaRDDSuite.java b/external/kafka-v09/src/test/java/org/apache/spark/streaming/kafka/v09/JavaKafkaRDDSuite.java
new file mode 100644
index 000000000000..e020e436518f
--- /dev/null
+++ b/external/kafka-v09/src/test/java/org/apache/spark/streaming/kafka/v09/JavaKafkaRDDSuite.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import scala.Tuple2;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+public class JavaKafkaRDDSuite implements Serializable {
+ private transient JavaSparkContext sc = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
+
+ @Before
+ public void setUp() {
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ sc = new JavaSparkContext(sparkConf);
+ }
+
+ @After
+ public void tearDown() {
+ if (sc != null) {
+ sc.stop();
+ sc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
+ }
+
+ @Test
+ public void testKafkaRDD() throws InterruptedException {
+ String topic1 = "topic1_testKafkaRDD";
+ String topic2 = "topic2_testKafkaRDD";
+
+ createTopicAndSendData(topic1);
+ createTopicAndSendData(topic2);
+
+ Map kafkaParams = new HashMap<>();
+ kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestUtils.brokerAddress());
+ kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaParams.put("spark.kafka.poll.time", "1000");
+
+ OffsetRange[] offsetRanges = {
+ OffsetRange.create(topic1, 0, 0, 1),
+ OffsetRange.create(topic2, 0, 0, 1)
+ };
+
+ JavaRDD rdd1 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ kafkaParams,
+ offsetRanges
+ ).map(
+ new Function, String>() {
+ @Override
+ public String call(Tuple2 kv) {
+ return kv._2();
+ }
+ }
+ );
+
+ JavaRDD rdd2 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ new Function, String>() {
+ @Override
+ public String call(ConsumerRecord consumerRecord) throws Exception {
+ return consumerRecord.value();
+ }
+ }
+ );
+
+ JavaRDD rdd3 = KafkaUtils.createRDD(
+ sc,
+ String.class,
+ String.class,
+ String.class,
+ kafkaParams,
+ offsetRanges,
+ new Function, String>() {
+ @Override
+ public String call(ConsumerRecord consumerRecord) throws Exception {
+ return consumerRecord.value();
+ }
+ }
+ );
+
+ // just making sure the java user apis work; the scala tests handle logic corner cases
+ long count1 = rdd1.count();
+ long count2 = rdd2.count();
+ long count3 = rdd3.count();
+ Assert.assertTrue(count1 > 0);
+ Assert.assertEquals(count1, count2);
+ Assert.assertEquals(count1, count3);
+ }
+
+ private String[] createTopicAndSendData(String topic) {
+ String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, data);
+ return data;
+ }
+}
diff --git a/external/kafka-v09/src/test/resources/log4j.properties b/external/kafka-v09/src/test/resources/log4j.properties
new file mode 100644
index 000000000000..75e3b53a093f
--- /dev/null
+++ b/external/kafka-v09/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/DirectKafkaStreamSuite.scala b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/DirectKafkaStreamSuite.scala
new file mode 100644
index 000000000000..cb4dab6a2cf4
--- /dev/null
+++ b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/DirectKafkaStreamSuite.scala
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import java.io.File
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.clients.consumer.{ ConsumerRecord, ConsumerConfig }
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.streaming.scheduler.{ StreamingListenerBatchCompleted, StreamingListenerBatchStarted, StreamingListenerBatchSubmitted, StreamingListener }
+import org.apache.spark.streaming.{ Time, Milliseconds, StreamingContext }
+import org.apache.spark.util.Utils
+import org.apache.spark.{ SparkContext, SparkConf, Logging, SparkFunSuite }
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{ BeforeAndAfter, BeforeAndAfterAll }
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+class DirectKafkaStreamSuite
+ extends SparkFunSuite
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with Eventually
+ with Logging {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ private var sc: SparkContext = _
+ private var ssc: StreamingContext = _
+ private var testDir: File = _
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ sc = null
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testDir != null) {
+ Utils.deleteRecursively(testDir)
+ }
+ }
+
+ test("basic stream receiving with multiple topics and earliest starting offset") {
+ val topics = Set("new_basic1", "new_basic2", "new_basic3")
+ val data = Map("a" -> 7, "b" -> 9)
+ topics.foreach { t =>
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
+ }
+ val totalSent = data.values.sum * topics.size
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "1000")
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, kafkaParams, topics)
+ }
+
+ val allReceived =
+ new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+
+ // hold a reference to the current offset ranges, so it can be used downstream
+ var offsetRanges = Array[OffsetRange]()
+
+ stream.transform { rdd =>
+ // Get the offset ranges in the RDD
+ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd
+ }.foreachRDD { rdd =>
+ for (o <- offsetRanges) {
+ log.info(s"${rdd.id} | ${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
+ val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+ // For each partition, get size of the range in the partition,
+ // and the number of items in the partition
+ val off = offsetRanges(i)
+ val all = iter.toSeq
+ val partSize = all.size
+ val rangeSize = off.untilOffset - off.fromOffset
+ Iterator((partSize, rangeSize))
+ }.collect
+
+ // Verify whether number of elements in each partition
+ // matches with the corresponding offset range
+ collected.foreach {
+ case (partSize, rangeSize) =>
+ assert(partSize === rangeSize, "offset ranges are wrong")
+ }
+ }
+ stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+ }
+ ssc.stop()
+ }
+
+ test("receiving from latest starting offset") {
+ val topic = "new_latest"
+ val topicPartition = new TopicPartition(topic, 0)
+ val data = Map("a" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "100")
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestOffsets(Set(topicPartition)).get(topicPartition).getOrElse(0)
+ }
+
+ // Send some initial messages before starting context
+ kafkaTestUtils.sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() > 3)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, kafkaParams, Set(topic))
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest"
+ )
+
+ val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+ stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ kafkaTestUtils.sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+ test("creating stream by offset") {
+ val topic = "new_offset"
+ val topicPartition = new TopicPartition(topic, 0)
+ val data = Map("a" -> 10)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "100")
+ val kc = new KafkaCluster(kafkaParams)
+ def getLatestOffset(): Long = {
+ kc.getLatestOffsets(Set(topicPartition)).get(topicPartition).getOrElse(0)
+ }
+
+ // Send some initial messages before starting context
+ kafkaTestUtils.sendMessages(topic, data)
+ eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ assert(getLatestOffset() >= 10)
+ }
+ val offsetBeforeStart = getLatestOffset()
+
+ // Setup context and kafka stream with largest offset
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String, String](
+ ssc, kafkaParams, Map(topicPartition -> 11L),
+ (m: ConsumerRecord[String, String]) => m.value())
+ }
+ assert(
+ stream.asInstanceOf[DirectKafkaInputDStream[_, _, _]]
+ .fromOffsets(topicPartition) >= offsetBeforeStart,
+ "Start offset not from latest")
+
+ val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+ stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
+ ssc.start()
+ val newData = Map("b" -> 10)
+ kafkaTestUtils.sendMessages(topic, newData)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ collectedData.contains("b")
+ }
+ assert(!collectedData.contains("a"))
+ }
+
+ // Test to verify the offset ranges can be recovered from the checkpoints
+ test("offset recovery") {
+ val topic = "new_recovery"
+ kafkaTestUtils.createTopic(topic)
+ testDir = Utils.createTempDir()
+
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "1000")
+
+ // Send data to Kafka and wait for it to be received
+ def sendDataAndWaitForReceive(data: Seq[Int]) {
+ val strings = data.map { _.toString }
+ kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1 }.toMap)
+ eventually(timeout(60 seconds), interval(200 milliseconds)) {
+ assert(strings.forall {
+ DirectKafkaStreamSuite.collectedData.contains
+ })
+ }
+ }
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ val kafkaStream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, kafkaParams, Set(topic))
+ }
+ val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
+ val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+ Some(values.sum + state.getOrElse(0))
+ }
+ ssc.checkpoint(testDir.getAbsolutePath)
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ DirectKafkaStreamSuite.collectedData.appendAll(data)
+ }
+
+ // This is ensure all the data is eventually receiving only once
+ stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+ rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+ }
+ ssc.start()
+
+ // Send some data and wait for them to be received
+ for (i <- (1 to 10).grouped(4)) {
+ sendDataAndWaitForReceive(i)
+ }
+
+ // Verify that offset ranges were generated
+ val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
+ assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+ assert(
+ offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+ "starting offset not zero"
+ )
+ ssc.stop()
+ logInfo("====== RESTARTING ========")
+
+ // Recover context from checkpoints
+ ssc = new StreamingContext(testDir.getAbsolutePath)
+ val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
+
+ // Verify offset ranges have been recovered
+ val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+ assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+ val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+ assert(
+ recoveredOffsetRanges.forall { or =>
+ earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+ },
+ "Recovered ranges are not the same as the ones generated"
+ )
+ // Restart context, give more data and verify the total at the end
+ // If the total is write that means each records has been received only once
+ ssc.start()
+ sendDataAndWaitForReceive(11 to 20)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+ }
+ ssc.stop()
+ }
+
+ test("Direct Kafka stream report input information") {
+ val topic = "new_report-test"
+ val data = Map("a" -> 7, "b" -> 9)
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, data)
+
+ val totalSent = data.values.sum
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "1000")
+
+ import DirectKafkaStreamSuite._
+ ssc = new StreamingContext(sparkConf, Milliseconds(200))
+ val collector = new InputInfoCollector
+ ssc.addStreamingListener(collector)
+
+ val stream = withClue("Error creating direct stream") {
+ KafkaUtils.createDirectStream[String, String](
+ ssc, kafkaParams, Set(topic))
+ }
+
+ val allReceived =
+ new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)]
+
+ stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
+ ssc.start()
+ eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+ assert(allReceived.size === totalSent,
+ "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
+
+ // Calculate all the record number collected in the StreamingListener.
+ assert(collector.numRecordsSubmitted.get() === totalSent)
+ assert(collector.numRecordsStarted.get() === totalSent)
+ assert(collector.numRecordsCompleted.get() === totalSent)
+ }
+ ssc.stop()
+ }
+
+ test("using rate controller") {
+ val topic = "new_backpressure"
+ val topicPartition = new TopicPartition(topic, 0)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "1000")
+
+ val batchIntervalMilliseconds = 100
+ val estimator = new ConstantEstimator(100)
+ val messageKeys = (1 to 200).map(_.toString)
+ val messages = messageKeys.map((_, 1)).toMap
+
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ val kc = new KafkaCluster(kafkaParams)
+ val messageHandler = (mmd: ConsumerRecord[String, String]) => (mmd.key(), mmd.value())
+ val m = kc.getEarliestOffsets(Set(topicPartition))
+
+ new DirectKafkaInputDStream[String, String, (String, String)](
+ ssc, kafkaParams, m, messageHandler) {
+ override protected[streaming] val rateController =
+ Some(new DirectKafkaRateController(id, estimator))
+ }
+ }
+
+ val collectedData =
+ new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
+
+ // Used for assertion failure messages.
+ def dataToString: String =
+ collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ collectedData += data
+ }
+
+ ssc.start()
+
+ // Try different rate limits.
+ // Send data to Kafka and wait for arrays of data to appear matching the rate.
+ Seq(100, 50, 20).foreach { rate =>
+ collectedData.clear() // Empty this buffer on each pass.
+ estimator.updateRate(rate) // Set a new rate.
+ // Expect blocks of data equal to "rate", scaled by the interval length in secs.
+ val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
+ kafkaTestUtils.sendMessages(topic, messages)
+ eventually(timeout(10.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+ // Assert that rate estimator values are used to determine maxMessagesPerPartition.
+ // Funky "-" in message makes the complete assertion message read better.
+ assert(collectedData.exists(_.size == expectedSize),
+ s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
+ }
+ }
+
+ ssc.stop()
+ }
+
+ /** Get the generated offset ranges from the DirectKafkaStream */
+ private def getOffsetRanges[K, V](kafkaStream: DStream[(K, V)]):
+ Seq[(Time, Array[OffsetRange])] = {
+ kafkaStream.generatedRDDs.mapValues { rdd =>
+ rdd.asInstanceOf[KafkaRDD[K, V, (K, V)]].offsetRanges
+ }.toSeq.sortBy { _._1 }
+ }
+}
+
+object DirectKafkaStreamSuite {
+ val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String]
+ @volatile var total = -1L
+
+ class InputInfoCollector extends StreamingListener {
+ val numRecordsSubmitted = new AtomicLong(0L)
+ val numRecordsStarted = new AtomicLong(0L)
+ val numRecordsCompleted = new AtomicLong(0L)
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+ numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
+ numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
+ }
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
+ }
+ }
+}
+
+private[streaming] class ConstantEstimator(@volatile private var rate: Long)
+ extends RateEstimator {
+
+ def updateRate(newRate: Long): Unit = {
+ rate = newRate
+ }
+
+ def compute(
+ time: Long,
+ elements: Long,
+ processingDelay: Long,
+ schedulingDelay: Long): Option[Double] = Some(rate)
+}
+
diff --git a/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaClusterSuite.scala b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaClusterSuite.scala
new file mode 100644
index 000000000000..7b7c16456468
--- /dev/null
+++ b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaClusterSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfterAll
+
+import scala.util.Random
+
+class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll {
+ private val topic = "new_kcsuitetopic" + Random.nextInt(10000)
+ private val topicPartition = new TopicPartition(topic, 0)
+ private var newKc: KafkaCluster[_, _] = null
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ override def beforeAll() {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer")
+ newKc = new KafkaCluster(kafkaParams)
+ }
+
+ override def afterAll() {
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ test("leader offset apis") {
+ val earliest = newKc.getEarliestOffsets(Set(topicPartition))
+ assert(earliest(topicPartition) === 0, "didn't get earliest")
+
+ val latest = newKc.getLatestOffsets(Set(topicPartition))
+ assert(latest(topicPartition) === 1, "didn't get latest")
+ }
+
+}
diff --git a/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaRDDSuite.scala b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaRDDSuite.scala
new file mode 100644
index 000000000000..9a296f0f33ac
--- /dev/null
+++ b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaRDDSuite.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+import org.apache.spark._
+import org.scalatest.BeforeAndAfterAll
+
+import scala.util.Random
+
+class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ private val sparkConf = new SparkConf().setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ private var sc: SparkContext = _
+
+ override def beforeAll {
+ sc = new SparkContext(sparkConf)
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+ }
+
+ override def afterAll {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ test("basic usage") {
+ val topic = s"new_topicbasic-${Random.nextInt}"
+ kafkaTestUtils.createTopic(topic)
+ val messages = Array("the", "quick", "brown", "fox")
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
+ ConsumerConfig.GROUP_ID_CONFIG -> s"test-consumer-${Random.nextInt}",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "10000")
+
+ val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+ val rdd = KafkaUtils.createRDD[String, String](
+ sc, kafkaParams, offsetRanges)
+
+ val received = rdd.map(_._2).collect.toSet
+ assert(received === messages.toSet)
+
+ // size-related method optimizations return sane results
+ assert(rdd.count === messages.size)
+ assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+ assert(!rdd.isEmpty)
+ assert(rdd.take(1).size === 1)
+ assert(rdd.take(1).head._2 === messages.head)
+ assert(rdd.take(messages.size + 10).size === messages.size)
+
+ val emptyRdd = KafkaUtils.createRDD[String, String](
+ sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
+
+ assert(emptyRdd.isEmpty)
+
+ // invalid offset ranges throw exceptions
+ val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+ intercept[SparkException] {
+ KafkaUtils.createRDD[String, String](
+ sc, kafkaParams, badRanges)
+ }
+ }
+
+ test("iterator boundary conditions") {
+ // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
+ val topic = s"new_topicboundary-${Random.nextInt}"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ kafkaTestUtils.createTopic(topic)
+
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaTestUtils.brokerAddress,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
+ ConsumerConfig.GROUP_ID_CONFIG -> s"test-consumer-${Random.nextInt}",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "1000")
+
+ val kc = new KafkaCluster(kafkaParams)
+
+ // this is the "lots of messages" case
+ kafkaTestUtils.sendMessages(topic, sent)
+ val sentCount = sent.values.sum
+
+ // rdd defined from leaders after sending messages, should get the number sent
+ val rdd = getRdd(kc, Set(topic))
+
+ assert(rdd.isDefined)
+
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
+
+ assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+ assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+ val rangesMap = ranges.map(o => new TopicPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+ // make sure consumer offsets are committed before the next getRdd call
+ kc.setConsumerOffsets(rangesMap)
+
+ // this is the "0 messages" case
+ val rdd2 = getRdd(kc, Set(topic))
+ // shouldn't get anything, since message is sent after rdd was defined
+ val sentOnlyOne = Map("d" -> 1)
+
+ kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+
+ assert(rdd2.isDefined)
+ assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
+
+ // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
+ val rdd3 = getRdd(kc, Set(topic))
+ // send lots of messages after rdd was defined, they shouldn't show up
+ kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
+
+ assert(rdd3.isDefined)
+ assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
+
+ }
+
+ // get an rdd from the committed consumer offsets until the latest leader offsets,
+ private def getRdd(kc: KafkaCluster[_, _], topics: Set[String]) = {
+ val groupId = kc.kafkaParams("group.id")
+ val topicPartitions = kc.getPartitions(topics)
+ val consumerOffsets = try {
+ kc.getCommittedOffsets(topicPartitions)
+ } catch {
+ case e: SparkException => kc.getEarliestOffsets(topicPartitions)
+ }
+ val latestOffsets = kc.getLatestOffsets(topicPartitions)
+
+ val offsetRanges = consumerOffsets.map { case (tp: TopicPartition, fromOffset: Long) =>
+ OffsetRange(tp.topic, tp.partition, fromOffset, latestOffsets(tp))
+ }.toArray
+
+ Option(KafkaUtils.createRDD[String, String, String](
+ sc, kc.kafkaParams, offsetRanges,
+ (cr: ConsumerRecord[String, String]) => s"${cr.offset()} ${cr.value()}"))
+ }
+
+}
diff --git a/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaUtilsSSLSuite.scala b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaUtilsSSLSuite.scala
new file mode 100644
index 000000000000..4ffb7bd53c90
--- /dev/null
+++ b/external/kafka-v09/src/test/scala/org/apache/spark/streaming/kafka/v09/KafkaUtilsSSLSuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.v09
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.spark.{SparkContext, SparkConf, SparkFunSuite}
+import org.scalatest.BeforeAndAfterAll
+
+class KafkaUtilsSSLSuite extends SparkFunSuite with BeforeAndAfterAll {
+ private var sc: SparkContext = _
+
+ private val pathToKeyStore = "/path/to/ssl_keystore"
+ private val pathToTrustStore = "/path/to/ssl_truststore"
+ private val keystorePasswd = "keystore_secret_pass"
+ private val truststorePasswd = "truststore_secret_pass"
+ private val keyPasswd = "key_secret_pass"
+
+ private val sparkSslProperties = Map[String, String] (
+ "spark.ssl.kafka.enabled" -> "true",
+ "spark.ssl.kafka.keyStore" -> pathToKeyStore,
+ "spark.ssl.kafka.keyStorePassword" -> keystorePasswd,
+ "spark.ssl.kafka.trustStore" -> pathToTrustStore,
+ "spark.ssl.kafka.trustStorePassword" -> truststorePasswd,
+ "spark.ssl.kafka.keyPassword" -> keyPasswd
+ )
+
+ private val kafkaSslProperties = Map[String, String] (
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> pathToKeyStore,
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> keystorePasswd,
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> pathToTrustStore,
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> truststorePasswd,
+ SslConfigs.SSL_KEY_PASSWORD_CONFIG -> keyPasswd
+ )
+
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+
+ override def beforeAll {
+ sparkConf.setAll(sparkSslProperties)
+ sc = new SparkContext(sparkConf)
+ }
+
+ override def afterAll {
+ if (sc != null) {
+ sc.stop
+ sc = null
+ }
+ }
+
+ test("Check adding SSL properties to Kafka parameters") {
+ val kafkaParams = Map(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
+ ConsumerConfig.GROUP_ID_CONFIG -> "test-consumer",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
+ "org.apache.kafka.common.serialization.StringDeserializer",
+ "spark.kafka.poll.time" -> "100")
+
+ val kafkaParamsWithSSL = KafkaUtils.addSSLOptions(kafkaParams, sc)
+
+ kafkaSslProperties.foreach {
+ case (k, v) => assert(kafkaParamsWithSSL.get(k).get.toString == v)
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 234fd5dea1a6..f180fead7d56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
repl
launcher
external/kafka
+ external/kafka-v09
external/kafka-assembly