Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand All @@ -32,13 +31,10 @@ import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingCont
import org.apache.spark.streaming.dstream._

/**
* :: Experimental ::
* object for constructing Kafka streams and RDDs
*/
@Experimental
object KafkaUtils extends Logging {
/**
* :: Experimental ::
* Scala constructor for a batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
Expand All @@ -52,7 +48,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createRDD[K, V](
sc: SparkContext,
kafkaParams: ju.Map[String, Object],
Expand All @@ -75,7 +70,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Java constructor for a batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
Expand All @@ -89,7 +83,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createRDD[K, V](
jsc: JavaSparkContext,
kafkaParams: ju.Map[String, Object],
Expand All @@ -101,7 +94,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Scala constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
Expand All @@ -114,7 +106,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
Expand All @@ -125,7 +116,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Scala constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
Expand All @@ -137,7 +127,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
Expand All @@ -148,7 +137,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Java constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
Expand All @@ -158,7 +146,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
Expand All @@ -170,7 +157,6 @@ object KafkaUtils extends Logging {
}

/**
* :: Experimental ::
* Java constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
Expand All @@ -182,7 +168,6 @@ object KafkaUtils extends Logging {
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
Expand Down